提交 d3c5320b 编写于 作者: H hjxilinx

Merge branch 'develop' into feature/liaohj

...@@ -8,7 +8,7 @@ C/C++ APIs are similar to the MySQL APIs. Applications should include TDengine h ...@@ -8,7 +8,7 @@ C/C++ APIs are similar to the MySQL APIs. Applications should include TDengine h
```C ```C
#include <taos.h> #include <taos.h>
``` ```
Make sure TDengine library _libtaos.so_ is installed and use _-ltaos_ option to link the library when compiling. The return values of all APIs are _-1_ or _NULL_ for failure. Make sure TDengine library _libtaos.so_ is installed and use _-ltaos_ option to link the library when compiling. In most cases, if the return value of an API is integer, it return _0_ for success and other values as an error code for failure; if the return value is pointer, then _NULL_ is used for failure.
### C/C++ sync API ### C/C++ sync API
...@@ -78,6 +78,51 @@ The 12 APIs are the most important APIs frequently used. Users can check _taos.h ...@@ -78,6 +78,51 @@ The 12 APIs are the most important APIs frequently used. Users can check _taos.h
**Note**: The connection to a TDengine server is not multi-thread safe. So a connection can only be used by one thread. **Note**: The connection to a TDengine server is not multi-thread safe. So a connection can only be used by one thread.
### C/C++ parameter binding API
TDengine also provides parameter binding APIs, like MySQL, only question mark `?` can be used to represent a parameter in these APIs.
- `TAOS_STMT* taos_stmt_init(TAOS *taos)`
Create a TAOS_STMT to represent the prepared statement for other APIs.
- `int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length)`
Parse SQL statement _sql_ and bind result to _stmt_ , if _length_ larger than 0, its value is used to determine the length of _sql_, the API auto detects the actual length of _sql_ otherwise.
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)`
Bind values to parameters. _bind_ points to an array, the element count and sequence of the array must be identical as the parameters of the SQL statement. The usage of _TAOS_BIND_ is same as _MYSQL_BIND_ in MySQL, its definition is as below:
```c
typedef struct TAOS_BIND {
int buffer_type;
void * buffer;
unsigned long buffer_length; // not used in TDengine
unsigned long *length;
int * is_null;
int is_unsigned; // not used in TDengine
int * error; // not used in TDengine
} TAOS_BIND;
```
- `int taos_stmt_add_batch(TAOS_STMT *stmt)`
Add bound parameters to batch, client can call `taos_stmt_bind_param` again after calling this API. Note this API only support _insert_ / _import_ statements, it returns an error in other cases.
- `int taos_stmt_execute(TAOS_STMT *stmt)`
Execute the prepared statement. This API can only be called once for a statement at present.
- `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)`
Acquire the result set of an executed statement. The usage of the result is same as `taos_use_result`, `taos_free_result` must be called after one you are done with the result set to release resources.
- `int taos_stmt_close(TAOS_STMT *stmt)`
Close the statement, release all resources.
### C/C++ async API ### C/C++ async API
In addition to sync APIs, TDengine also provides async APIs, which are more efficient. Async APIs are returned right away without waiting for a response from the server, allowing the application to continute with other tasks without blocking. So async APIs are more efficient, especially useful when in a poor network. In addition to sync APIs, TDengine also provides async APIs, which are more efficient. Async APIs are returned right away without waiting for a response from the server, allowing the application to continute with other tasks without blocking. So async APIs are more efficient, especially useful when in a poor network.
......
...@@ -4,13 +4,13 @@ TDengine提供了丰富的应用程序开发接口,其中包括C/C++、JAVA、 ...@@ -4,13 +4,13 @@ TDengine提供了丰富的应用程序开发接口,其中包括C/C++、JAVA、
## C/C++ Connector ## C/C++ Connector
C/C++的API类似于MySQL的C API。应用程序使用时,需要包含TDengine头文件 _taos.h_(安装后,位于_/usr/local/taos/include_): C/C++的API类似于MySQL的C API。应用程序使用时,需要包含TDengine头文件 _taos.h_(安装后,位于 _/usr/local/taos/include_):
```C ```C
#include <taos.h> #include <taos.h>
``` ```
在编译时需要链接TDengine动态库_libtaos.so_(安装后,位于/usr/local/taos/driver,gcc编译时,请加上 -ltaos)。 所有API都以返回_-1_或_NULL_均表示失败。 在编译时需要链接TDengine动态库 _libtaos.so_ (安装后,位于 _/usr/local/taos/driver_,gcc编译时,请加上 -ltaos)。 如未特别说明,当API的返回值是整数时,_0_ 代表成功,其它是代表失败原因的错误码,当返回值是指针时, _NULL_ 表示失败。
### C/C++同步API ### C/C++同步API
...@@ -79,6 +79,51 @@ C/C++的API类似于MySQL的C API。应用程序使用时,需要包含TDengine ...@@ -79,6 +79,51 @@ C/C++的API类似于MySQL的C API。应用程序使用时,需要包含TDengine
**注意**:对于单个数据库连接,在同一时刻只能有一个线程使用该链接调用API,否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理。 **注意**:对于单个数据库连接,在同一时刻只能有一个线程使用该链接调用API,否则会有未定义的行为出现并可能导致客户端crash。客户端应用可以通过建立多个连接进行多线程的数据写入或查询处理。
### C/C++ 参数绑定接口
除了直接调用 `taos_query` 进行查询,TDengine也提供了支持参数绑定的Prepare API,与 MySQL 一样,这些API目前也仅支持用问号`?`来代表待绑定的参数,具体如下:
- `TAOS_STMT* taos_stmt_init(TAOS *taos)`
创建一个 TAOS_STMT 对象用于后续调用。
- `int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length)`
解析一条sql语句,将解析结果和参数信息绑定到stmt上,如果参数length大于0,将使用此此参数作为sql语句的长度,如等于0,将自动判断sql语句的长度。
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)`
进行参数绑定,bind指向一个数组,需保证此数组的元素数量和顺序与sql语句中的参数完全一致。TAOS_BIND 的使用方法与 MySQL中的 MYSQL_BIND 一致,具体定义如下:
```c
typedef struct TAOS_BIND {
int buffer_type;
void * buffer;
unsigned long buffer_length; // 未实际使用
unsigned long *length;
int * is_null;
int is_unsigned; // 未实际使用
int * error; // 未实际使用
} TAOS_BIND;
```
- `int taos_stmt_add_batch(TAOS_STMT *stmt)`
将当前绑定的参数加入批处理中,调用此函数后,可以再次调用`taos_stmt_bind_param`绑定新的参数。需要注意,此函数仅支持 insert/import 语句,如果是select等其他SQL语句,将返回错误。
- `int taos_stmt_execute(TAOS_STMT *stmt)`
执行准备好的语句。目前,一条语句只能执行一次。
- `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)`
获取语句的结果集。结果集的使用方式与非参数化调用时一致,使用完成后,应对此结果集调用 `taos_free_result`以释放资源。
- `int taos_stmt_close(TAOS_STMT *stmt)`
执行完毕,释放所有资源。
### C/C++异步API ### C/C++异步API
同步API之外,TDengine还提供性能更高的异步调用API处理数据插入、查询操作。在软硬件环境相同的情况下,异步API处理数据插入的速度比同步API快2~4倍。异步API采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步API在网络延迟严重的情况下,优点尤为突出。 同步API之外,TDengine还提供性能更高的异步调用API处理数据插入、查询操作。在软硬件环境相同的情况下,异步API处理数据插入的速度比同步API快2~4倍。异步API采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步API在网络延迟严重的情况下,优点尤为突出。
......
...@@ -154,7 +154,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -154,7 +154,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
} }
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, int port) { TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, int port) {
if (ip != NULL && (strcmp("127.0.0.1", ip) == 0 || strcasecmp("localhost", ip) == 0)) { if (ip == NULL || (ip != NULL && (strcmp("127.0.0.1", ip) == 0 || strcasecmp("localhost", ip) == 0))) {
#ifdef CLUSTER #ifdef CLUSTER
ip = tsPrivateIp; ip = tsPrivateIp;
#else #else
......
...@@ -21,40 +21,41 @@ extern "C" { ...@@ -21,40 +21,41 @@ extern "C" {
#endif #endif
typedef void *tmr_h; typedef void *tmr_h;
typedef void (*TAOS_TMR_CALLBACK)(void *, void *);
extern uint32_t tmrDebugFlag; extern uint32_t tmrDebugFlag;
extern int taosTmrThreads; extern int taosTmrThreads;
#define tmrError(...) \ #define tmrError(...) \
if (tmrDebugFlag & DEBUG_ERROR) { \ do { if (tmrDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR TMR ", tmrDebugFlag, __VA_ARGS__); \ tprintf("ERROR TMR ", tmrDebugFlag, __VA_ARGS__); \
} } } while(0)
#define tmrWarn(...) \ #define tmrWarn(...) \
if (tmrDebugFlag & DEBUG_WARN) { \ do { if (tmrDebugFlag & DEBUG_WARN) { \
tprintf("WARN TMR ", tmrDebugFlag, __VA_ARGS__); \ tprintf("WARN TMR ", tmrDebugFlag, __VA_ARGS__); \
} } } while(0)
#define tmrTrace(...) \ #define tmrTrace(...) \
if (tmrDebugFlag & DEBUG_TRACE) { \ do { if (tmrDebugFlag & DEBUG_TRACE) { \
tprintf("TMR ", tmrDebugFlag, __VA_ARGS__); \ tprintf("TMR ", tmrDebugFlag, __VA_ARGS__); \
} } } while(0)
#define MAX_NUM_OF_TMRCTL 512 #define MAX_NUM_OF_TMRCTL 32
#define MSECONDS_PER_TICK 5 #define MSECONDS_PER_TICK 5
void *taosTmrInit(int maxTmr, int resoultion, int longest, char *label); void *taosTmrInit(int maxTmr, int resoultion, int longest, const char *label);
tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle); tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void *param, void *handle);
void taosTmrStop(tmr_h tmrId); bool taosTmrStop(tmr_h tmrId);
void taosTmrStopA(tmr_h *timerId); bool taosTmrStopA(tmr_h *timerId);
void taosTmrReset(void (*fp)(void *, void *), int mseconds, void *param1, void *handle, tmr_h *pTmrId); bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void *param, void *handle, tmr_h *pTmrId);
void taosTmrCleanUp(void *handle); void taosTmrCleanUp(void *handle);
void taosTmrList(void *handle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -92,7 +92,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -92,7 +92,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
return false; return false;
} }
bool us = taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO; int precision = taos_result_precision(result);
// such as select count(*) from sys.cpu // such as select count(*) from sys.cpu
// such as select count(*) from sys.cpu group by ipaddr // such as select count(*) from sys.cpu group by ipaddr
...@@ -151,7 +151,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -151,7 +151,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
snprintf(target, HTTP_GC_TARGET_SIZE, "%s%s", aliasBuffer, (char *)row[groupFields]); snprintf(target, HTTP_GC_TARGET_SIZE, "%s%s", aliasBuffer, (char *)row[groupFields]);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
if (us) { if (precision == TSDB_TIME_PRECISION_MILLI) {
snprintf(target, HTTP_GC_TARGET_SIZE, "%s%ld", aliasBuffer, *((int64_t *) row[groupFields])); snprintf(target, HTTP_GC_TARGET_SIZE, "%s%ld", aliasBuffer, *((int64_t *) row[groupFields]));
} else { } else {
snprintf(target, HTTP_GC_TARGET_SIZE, "%s%ld", aliasBuffer, *((int64_t *) row[groupFields]) / 1000); snprintf(target, HTTP_GC_TARGET_SIZE, "%s%ld", aliasBuffer, *((int64_t *) row[groupFields]) / 1000);
...@@ -210,7 +210,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -210,7 +210,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
httpJsonStringForTransMean(jsonBuf, row[i], fields[i].bytes); httpJsonStringForTransMean(jsonBuf, row[i], fields[i].bytes);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
if (precision == TSDB_TIME_PRECISION_MILLI) { //ms
httpJsonInt64(jsonBuf, *((int64_t *)row[i])); httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
} else {
httpJsonInt64(jsonBuf, *((int64_t *)row[i]) / 1000);
}
break; break;
default: default:
httpJsonString(jsonBuf, "invalidcol", 10); httpJsonString(jsonBuf, "invalidcol", 10);
......
...@@ -55,10 +55,44 @@ ...@@ -55,10 +55,44 @@
#define taosWriteSocket(fd, buf, len) write(fd, buf, len) #define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len) #define taosReadSocket(fd, buf, len) read(fd, buf, len)
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_64(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_ptr(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_store_8(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_16(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_32(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_64(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_ptr(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_8(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_16(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_32(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins.
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap #define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap #define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch #define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch #define __sync_add_and_fetch_32 __sync_add_and_fetch
#define __sync_add_and_fetch_16 __sync_add_and_fetch
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch
int32_t __sync_val_load_32(int32_t *ptr); int32_t __sync_val_load_32(int32_t *ptr);
void __sync_val_restore_32(int32_t *ptr, int32_t newval); void __sync_val_restore_32(int32_t *ptr, int32_t newval);
......
...@@ -72,14 +72,43 @@ extern "C" { ...@@ -72,14 +72,43 @@ extern "C" {
#define taosWriteSocket(fd, buf, len) write(fd, buf, len) #define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len) #define taosReadSocket(fd, buf, len) read(fd, buf, len)
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_64(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_ptr(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_store_8(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_16(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_32(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_64(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_ptr(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_8(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_16(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_32(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins.
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap #define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap #define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch #define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch #define __sync_add_and_fetch_32 __sync_add_and_fetch
#define __sync_add_and_fetch_16 __sync_add_and_fetch
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch #define __sync_sub_and_fetch_64 __sync_sub_and_fetch
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch #define __sync_sub_and_fetch_32 __sync_sub_and_fetch
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch
int32_t __sync_val_load_32(int32_t *ptr); int32_t __sync_val_load_32(int32_t *ptr);
void __sync_val_restore_32(int32_t *ptr, int32_t newval); void __sync_val_restore_32(int32_t *ptr, int32_t newval);
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include <math.h> #include <math.h>
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
#include <intrin.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -78,12 +79,75 @@ extern "C" { ...@@ -78,12 +79,75 @@ extern "C" {
#define taosWriteSocket(fd, buf, len) send(fd, buf, len, 0) #define taosWriteSocket(fd, buf, len) send(fd, buf, len, 0)
#define taosReadSocket(fd, buf, len) recv(fd, buf, len, 0) #define taosReadSocket(fd, buf, len) recv(fd, buf, len, 0)
int32_t __sync_val_compare_and_swap_32(int32_t *ptr, int32_t oldval, int32_t newval); #if defined(_M_ARM) || defined(_M_ARM64)
int32_t __sync_add_and_fetch_32(int32_t *ptr, int32_t val);
int32_t __sync_sub_and_fetch_32(int32_t *ptr, int32_t val); #define atomic_load_8(ptr) __iso_volatile_load8((const volatile __int8*)(ptr))
int64_t __sync_val_compare_and_swap_64(int64_t *ptr, int64_t oldval, int64_t newval); #define atomic_load_16(ptr) __iso_volatile_load16((const volatile __int16*)(ptr))
int64_t __sync_add_and_fetch_64(int64_t *ptr, int64_t val); #define atomic_load_32(ptr) __iso_volatile_load32((const volatile __int32*)(ptr))
int64_t __sync_sub_and_fetch_64(int64_t *ptr, int64_t val); #define atomic_load_64(ptr) __iso_volatile_load64((const volatile __int64*)(ptr))
#define atomic_store_8(ptr, val) __iso_volatile_store8((volatile __int8*)(ptr), (__int8)(val))
#define atomic_store_16(ptr, val) __iso_volatile_store16((volatile __int16*)(ptr), (__int16)(val))
#define atomic_store_32(ptr, val) __iso_volatile_store32((volatile __int32*)(ptr), (__int32)(val))
#define atomic_store_64(ptr, val) __iso_volatile_store64((volatile __int64*)(ptr), (__int64)(val))
#ifdef _M_ARM64
#define atomic_load_ptr atomic_load_64
#define atomic_store_ptr atomic_store_64
#else
#define atomic_load_ptr atomic_load_32
#define atomic_store_ptr atomic_store_32
#endif
#else
#define atomic_load_8(ptr) (*(char volatile*)(ptr))
#define atomic_load_16(ptr) (*(short volatile*)(ptr))
#define atomic_load_32(ptr) (*(long volatile*)(ptr))
#define atomic_load_64(ptr) (*(__int64 volatile*)(ptr))
#define atomic_load_ptr(ptr) (*(void* volatile*)(ptr))
#define atomic_store_8(ptr, val) ((*(char volatile*)(ptr)) = (char)(val))
#define atomic_store_16(ptr, val) ((*(short volatile*)(ptr)) = (short)(val))
#define atomic_store_32(ptr, val) ((*(long volatile*)(ptr)) = (long)(val))
#define atomic_store_64(ptr, val) ((*(__int64 volatile*)(ptr)) = (__int64)(val))
#define atomic_store_ptr(ptr, val) ((*(void* volatile*)(ptr)) = (void*)(val))
#endif
#define atomic_exchange_8(ptr, val) _InterlockedExchange8((char volatile*)(ptr), (char)(val))
#define atomic_exchange_16(ptr, val) _InterlockedExchange16((short volatile*)(ptr), (short)(val))
#define atomic_exchange_32(ptr, val) _InterlockedExchange((long volatile*)(ptr), (long)(val))
#define atomic_exchange_64(ptr, val) _InterlockedExchange64((__int64 volatile*)(ptr), (__int64)(val))
#define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val))
#define __sync_val_compare_and_swap_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval))
#define __sync_val_compare_and_swap_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval))
#define __sync_val_compare_and_swap_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval))
#define __sync_val_compare_and_swap_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval))
#define __sync_val_compare_and_swap_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval))
char interlocked_add_8(char volatile *ptr, char val);
short interlocked_add_16(short volatile *ptr, short val);
long interlocked_add_32(long volatile *ptr, long val);
__int64 interlocked_add_64(__int64 volatile *ptr, __int64 val);
#define __sync_add_and_fetch_8(ptr, val) interlocked_add_8((char volatile*)(ptr), (char)(val))
#define __sync_add_and_fetch_16(ptr, val) interlocked_add_16((short volatile*)(ptr), (short)(val))
#define __sync_add_and_fetch_32(ptr, val) interlocked_add_32((long volatile*)(ptr), (long)(val))
#define __sync_add_and_fetch_64(ptr, val) interlocked_add_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define __sync_add_and_fetch_ptr __sync_add_and_fetch_64
#else
#define __sync_add_and_fetch_ptr __sync_add_and_fetch_32
#endif
#define __sync_sub_and_fetch_8(ptr, val) __sync_add_and_fetch_8((ptr), -(val))
#define __sync_sub_and_fetch_16(ptr, val) __sync_add_and_fetch_16((ptr), -(val))
#define __sync_sub_and_fetch_32(ptr, val) __sync_add_and_fetch_32((ptr), -(val))
#define __sync_sub_and_fetch_64(ptr, val) __sync_add_and_fetch_64((ptr), -(val))
#define __sync_sub_and_fetch_ptr(ptr, val) __sync_add_and_fetch_ptr((ptr), -(val))
int32_t __sync_val_load_32(int32_t *ptr); int32_t __sync_val_load_32(int32_t *ptr);
void __sync_val_restore_32(int32_t *ptr, int32_t newval); void __sync_val_restore_32(int32_t *ptr, int32_t newval);
......
...@@ -43,8 +43,11 @@ void taosResetPthread(pthread_t *thread) { ...@@ -43,8 +43,11 @@ void taosResetPthread(pthread_t *thread) {
} }
int64_t taosGetPthreadId() { int64_t taosGetPthreadId() {
pthread_t id = pthread_self(); #ifdef PTW32_VERSION
return (int64_t)id.p; return pthread_getw32threadid_np(pthread_self());
#else
return (int64_t)pthread_self();
#endif
} }
int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen) { int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen) {
...@@ -63,28 +66,21 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle ...@@ -63,28 +66,21 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle
return setsockopt(socketfd, level, optname, optval, optlen); return setsockopt(socketfd, level, optname, optval, optlen);
} }
int32_t __sync_val_compare_and_swap_32(int32_t *ptr, int32_t oldval, int32_t newval) {
return InterlockedCompareExchange(ptr, newval, oldval);
}
int32_t __sync_add_and_fetch_32(int32_t *ptr, int32_t val) {
return InterlockedAdd(ptr, val);
}
int32_t __sync_sub_and_fetch_32(int32_t *ptr, int32_t val) { char interlocked_add_8(char volatile* ptr, char val) {
return InterlockedAdd(ptr, -val); return _InterlockedExchangeAdd8(ptr, val) + val;
} }
int64_t __sync_val_compare_and_swap_64(int64_t *ptr, int64_t oldval, int64_t newval) { short interlocked_add_16(short volatile* ptr, short val) {
return InterlockedCompareExchange64(ptr, newval, oldval); return _InterlockedExchangeAdd16(ptr, val) + val;
} }
int64_t __sync_add_and_fetch_64(int64_t *ptr, int64_t val) { long interlocked_add_32(long volatile* ptr, long val) {
return InterlockedAdd64(ptr, val); return _InterlockedExchangeAdd(ptr, val) + val;
} }
int64_t __sync_sub_and_fetch_64(int64_t *ptr, int64_t val) { __int64 interlocked_add_64(__int64 volatile* ptr, __int64 val) {
return InterlockedAdd64(ptr, -val); return _InterlockedExchangeAdd64(ptr, val) + val;
} }
int32_t __sync_val_load_32(int32_t *ptr) { int32_t __sync_val_load_32(int32_t *ptr) {
......
...@@ -16,555 +16,539 @@ ...@@ -16,555 +16,539 @@
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <sched.h>
#include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include "os.h" #include "os.h"
#include "tidpool.h"
#include "tlog.h" #include "tlog.h"
#include "tsched.h" #include "tsched.h"
#include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
// special mempool without mutex
#define mpool_h void *
typedef struct {
int numOfFree; /* number of free slots */
int first; /* the first free slot */
int numOfBlock; /* the number of blocks */
int blockSize; /* block size in bytes */
int * freeList; /* the index list */
char *pool; /* the actual mem block */
} pool_t;
mpool_h tmrMemPoolInit(int maxNum, int blockSize);
char *tmrMemPoolMalloc(mpool_h handle);
void tmrMemPoolFree(mpool_h handle, char *p);
void tmrMemPoolCleanUp(mpool_h handle);
typedef struct _tmr_obj {
void *param1;
void (*fp)(void *, void *);
tmr_h timerId;
short cycle;
struct _tmr_obj * prev;
struct _tmr_obj * next;
int index;
struct _tmr_ctrl_t *pCtrl;
} tmr_obj_t;
typedef struct { #define TIMER_STATE_WAITING 0
tmr_obj_t *head; #define TIMER_STATE_EXPIRED 1
int count; #define TIMER_STATE_STOPPED 2
} tmr_list_t; #define TIMER_STATE_CANCELED 3
typedef struct _tmr_ctrl_t { typedef union _tmr_ctrl_t {
void * signature; char label[16];
pthread_mutex_t mutex; /* mutex to protect critical resource */ struct {
int resolution; /* resolution in mseconds */ // pad to ensure 'next' is the end of this union
int numOfPeriods; /* total number of periods */ char padding[16 - sizeof(union _tmr_ctrl_t*)];
int64_t periodsFromStart; /* count number of periods since start */ union _tmr_ctrl_t* next;
pthread_t thread; /* timer thread ID */ };
tmr_list_t * tmrList;
mpool_h poolHandle;
char label[12];
int maxNumOfTmrs;
int numOfTmrs;
int ticks;
int maxTicks;
int tmrCtrlId;
} tmr_ctrl_t; } tmr_ctrl_t;
typedef struct tmr_obj_t {
uintptr_t id;
tmr_ctrl_t* ctrl;
struct tmr_obj_t* mnext;
struct tmr_obj_t* prev;
struct tmr_obj_t* next;
uint16_t slot;
uint8_t wheel;
uint8_t state;
uint8_t refCount;
uint8_t reserved1;
uint16_t reserved2;
union {
int64_t expireAt;
int64_t executedBy;
};
TAOS_TMR_CALLBACK fp;
void* param;
} tmr_obj_t;
typedef struct timer_list_t {
int64_t lockedBy;
tmr_obj_t* timers;
} timer_list_t;
typedef struct timer_map_t {
uint32_t size;
uint32_t count;
timer_list_t* slots;
} timer_map_t;
typedef struct time_wheel_t {
pthread_mutex_t mutex;
int64_t nextScanAt;
uint32_t resolution;
uint16_t size;
uint16_t index;
tmr_obj_t** slots;
} time_wheel_t;
uint32_t tmrDebugFlag = DEBUG_ERROR | DEBUG_WARN | DEBUG_FILE; uint32_t tmrDebugFlag = DEBUG_ERROR | DEBUG_WARN | DEBUG_FILE;
void taosTmrProcessList(tmr_ctrl_t *);
tmr_ctrl_t tmrCtrl[MAX_NUM_OF_TMRCTL]; static pthread_once_t tmrModuleInit = PTHREAD_ONCE_INIT;
int numOfTmrCtrl = 0; static pthread_mutex_t tmrCtrlMutex;
void * tmrIdPool = NULL; static tmr_ctrl_t tmrCtrls[MAX_NUM_OF_TMRCTL];
void * tmrQhandle; static tmr_ctrl_t* unusedTmrCtrl = NULL;
void* tmrQhandle;
int taosTmrThreads = 1; int taosTmrThreads = 1;
void taosTimerLoopFunc(int signo) { static uintptr_t nextTimerId = 0;
tmr_ctrl_t *pCtrl;
int count = 0; static time_wheel_t wheels[] = {
{.resolution = MSECONDS_PER_TICK, .size = 4096},
{.resolution = 1000, .size = 1024},
{.resolution = 60000, .size = 1024},
};
static timer_map_t timerMap;
static uintptr_t getNextTimerId() {
uintptr_t id;
do {
id = __sync_add_and_fetch_ptr(&nextTimerId, 1);
} while (id == 0);
return id;
}
static void timerAddRef(tmr_obj_t* timer) { __sync_add_and_fetch_8(&timer->refCount, 1); }
for (int i = 1; i < MAX_NUM_OF_TMRCTL; ++i) { static void timerDecRef(tmr_obj_t* timer) {
pCtrl = tmrCtrl + i; if (__sync_sub_and_fetch_8(&timer->refCount, 1) == 0) {
if (pCtrl->signature) { free(timer);
count++;
pCtrl->ticks++;
if (pCtrl->ticks >= pCtrl->maxTicks) {
taosTmrProcessList(pCtrl);
pCtrl->ticks = 0;
} }
if (count >= numOfTmrCtrl) break; }
static void lockTimerList(timer_list_t* list) {
int64_t tid = taosGetPthreadId();
int i = 0;
while (__sync_val_compare_and_swap_64(&(list->lockedBy), 0, tid) != 0) {
if (++i % 1000 == 0) {
sched_yield();
} }
} }
} }
void taosTmrModuleInit(void) { static void unlockTimerList(timer_list_t* list) {
tmrIdPool = taosInitIdPool(MAX_NUM_OF_TMRCTL); int64_t tid = taosGetPthreadId();
memset(tmrCtrl, 0, sizeof(tmrCtrl)); if (__sync_val_compare_and_swap_64(&(list->lockedBy), tid, 0) != tid) {
assert(false);
tmrError("trying to unlock a timer list not locked by current thread.");
}
}
static void addTimer(tmr_obj_t* timer) {
timerAddRef(timer);
timer->wheel = tListLen(wheels);
taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK); uint32_t idx = (uint32_t)(timer->id % timerMap.size);
timer_list_t* list = timerMap.slots + idx;
tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr"); lockTimerList(list);
tmrTrace("timer module is initialized, thread:%d", taosTmrThreads); timer->mnext = list->timers;
list->timers = timer;
unlockTimerList(list);
} }
void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) { static tmr_obj_t* findTimer(uintptr_t id) {
static pthread_once_t tmrInit = PTHREAD_ONCE_INIT; tmr_obj_t* timer = NULL;
tmr_ctrl_t * pCtrl; if (id > 0) {
uint32_t idx = (uint32_t)(id % timerMap.size);
pthread_once(&tmrInit, taosTmrModuleInit); timer_list_t* list = timerMap.slots + idx;
lockTimerList(list);
for (timer = list->timers; timer != NULL; timer = timer->mnext) {
if (timer->id == id) {
timerAddRef(timer);
break;
}
}
unlockTimerList(list);
}
return timer;
}
int tmrCtrlId = taosAllocateId(tmrIdPool); static void removeTimer(uintptr_t id) {
tmr_obj_t* prev = NULL;
uint32_t idx = (uint32_t)(id % timerMap.size);
timer_list_t* list = timerMap.slots + idx;
lockTimerList(list);
for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) {
if (p->id == id) {
if (prev == NULL) {
list->timers = p->mnext;
} else {
prev->mnext = p->mnext;
}
timerDecRef(p);
break;
}
prev = p;
}
unlockTimerList(list);
}
if (tmrCtrlId < 0) { static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
tmrError("%s bug!!! too many timers!!!", label); timerAddRef(timer);
return NULL; // select a wheel for the timer, we are not an accurate timer,
// but the inaccuracy should not be too large.
timer->wheel = tListLen(wheels) - 1;
for (uint8_t i = 0; i < tListLen(wheels); i++) {
time_wheel_t* wheel = wheels + i;
if (delay < wheel->resolution * wheel->size) {
timer->wheel = i;
break;
}
} }
pCtrl = tmrCtrl + tmrCtrlId; time_wheel_t* wheel = wheels + timer->wheel;
tfree(pCtrl->tmrList); timer->prev = NULL;
tmrMemPoolCleanUp(pCtrl->poolHandle); timer->expireAt = taosGetTimestampMs() + delay;
memset(pCtrl, 0, sizeof(tmr_ctrl_t)); pthread_mutex_lock(&wheel->mutex);
pCtrl->tmrCtrlId = tmrCtrlId; uint32_t idx = 0;
strcpy(pCtrl->label, label); if (timer->expireAt > wheel->nextScanAt) {
pCtrl->maxNumOfTmrs = maxNumOfTmrs; // adjust delay according to next scan time of this wheel
// so that the timer is not fired earlier than desired.
delay = (uint32_t)(timer->expireAt - wheel->nextScanAt);
idx = (delay + wheel->resolution - 1) / wheel->resolution;
}
if ((pCtrl->poolHandle = tmrMemPoolInit(maxNumOfTmrs + 10, sizeof(tmr_obj_t))) == NULL) { timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size);
tmrError("%s failed to allocate mem pool", label); tmr_obj_t* p = wheel->slots[timer->slot];
tmrMemPoolCleanUp(pCtrl->poolHandle); wheel->slots[timer->slot] = timer;
return NULL; timer->next = p;
if (p != NULL) {
p->prev = timer;
} }
if (resolution < MSECONDS_PER_TICK) resolution = MSECONDS_PER_TICK; pthread_mutex_unlock(&wheel->mutex);
pCtrl->resolution = resolution; }
pCtrl->maxTicks = resolution / MSECONDS_PER_TICK;
pCtrl->ticks = rand() / pCtrl->maxTicks;
pCtrl->numOfPeriods = longest / resolution;
if (pCtrl->numOfPeriods < 10) pCtrl->numOfPeriods = 10;
pCtrl->tmrList = (tmr_list_t *)malloc(sizeof(tmr_list_t) * pCtrl->numOfPeriods); static bool removeFromWheel(tmr_obj_t* timer) {
if (pCtrl->tmrList == NULL) { if (timer->wheel >= tListLen(wheels)) {
tmrError("%s failed to allocate(size:%d) mem for tmrList", label, sizeof(tmr_list_t) * pCtrl->numOfPeriods); return false;
tmrMemPoolCleanUp(pCtrl->poolHandle);
taosTmrCleanUp(pCtrl);
return NULL;
} }
time_wheel_t* wheel = wheels + timer->wheel;
for (int i = 0; i < pCtrl->numOfPeriods; i++) { bool removed = false;
pCtrl->tmrList[i].head = NULL; pthread_mutex_lock(&wheel->mutex);
pCtrl->tmrList[i].count = 0; // other thread may modify timer->wheel, check again.
if (timer->wheel < tListLen(wheels)) {
if (timer->prev != NULL) {
timer->prev->next = timer->next;
} }
if (timer->next != NULL) {
if (pthread_mutex_init(&pCtrl->mutex, NULL) < 0) { timer->next->prev = timer->prev;
tmrError("%s failed to create the mutex, reason:%s", label, strerror(errno));
taosTmrCleanUp(pCtrl);
return NULL;
} }
if (timer == wheel->slots[timer->slot]) {
wheel->slots[timer->slot] = timer->next;
}
timer->wheel = tListLen(wheels);
timer->next = NULL;
timer->prev = NULL;
timerDecRef(timer);
removed = true;
}
pthread_mutex_unlock(&wheel->mutex);
pCtrl->signature = pCtrl; return removed;
numOfTmrCtrl++;
tmrTrace("%s timer ctrl is initialized, index:%d", label, tmrCtrlId);
return pCtrl;
} }
void taosTmrProcessList(tmr_ctrl_t *pCtrl) { static void processExpiredTimer(void* handle, void* arg) {
unsigned int index; tmr_obj_t* timer = (tmr_obj_t*)handle;
tmr_list_t * pList; timer->executedBy = taosGetPthreadId();
tmr_obj_t * pObj, *header; uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED);
if (state == TIMER_STATE_WAITING) {
const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] execution start.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
pthread_mutex_lock(&pCtrl->mutex); (*timer->fp)(timer->param, (tmr_h)timer->id);
index = pCtrl->periodsFromStart % pCtrl->numOfPeriods; atomic_store_8(&timer->state, TIMER_STATE_STOPPED);
pList = &pCtrl->tmrList[index];
while (1) { fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] execution end.";
header = pList->head; tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
if (header == NULL) break;
if (header->cycle > 0) {
pObj = header;
while (pObj) {
pObj->cycle--;
pObj = pObj->next;
}
break;
} }
removeTimer(timer->id);
timerDecRef(timer);
}
pCtrl->numOfTmrs--; static void addToExpired(tmr_obj_t* head) {
tmrTrace("%s %p, timer expired, fp:%p, tmr_h:%p, index:%d, total:%d", pCtrl->label, header->param1, header->fp, const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] expired";
header, index, pCtrl->numOfTmrs);
pList->head = header->next; while (head != NULL) {
if (header->next) header->next->prev = NULL; tmrTrace(fmt, head->ctrl->label, head->id, head->fp, head->param);
pList->count--;
header->timerId = NULL;
tmr_obj_t* next = head->next;
SSchedMsg schedMsg; SSchedMsg schedMsg;
schedMsg.fp = NULL; schedMsg.fp = NULL;
schedMsg.tfp = header->fp; schedMsg.tfp = processExpiredTimer;
schedMsg.ahandle = header->param1; schedMsg.ahandle = head;
schedMsg.thandle = header; schedMsg.thandle = NULL;
taosScheduleTask(tmrQhandle, &schedMsg); taosScheduleTask(tmrQhandle, &schedMsg);
head = next;
tmrMemPoolFree(pCtrl->poolHandle, (char *)header);
} }
pCtrl->periodsFromStart++;
pthread_mutex_unlock(&pCtrl->mutex);
} }
void taosTmrCleanUp(void *handle) { static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int mseconds, void* param, tmr_ctrl_t* ctrl) {
tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle; uintptr_t id = getNextTimerId();
if (pCtrl == NULL || pCtrl->signature != pCtrl) return; timer->id = id;
timer->state = TIMER_STATE_WAITING;
timer->fp = fp;
timer->param = param;
timer->ctrl = ctrl;
addTimer(timer);
const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] started";
tmrTrace(fmt, ctrl->label, timer->id, timer->fp, timer->param);
if (mseconds == 0) {
timer->wheel = tListLen(wheels);
timerAddRef(timer);
addToExpired(timer);
} else {
addToWheel(timer, mseconds);
}
pCtrl->signature = NULL; // note: use `timer->id` here is unsafe as `timer` may already be freed
taosFreeId(tmrIdPool, pCtrl->tmrCtrlId); return id;
numOfTmrCtrl--;
tmrTrace("%s is cleaned up, numOfTmrs:%d", pCtrl->label, numOfTmrCtrl);
} }
tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle) { tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle) {
tmr_obj_t * pObj, *cNode, *pNode; tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
tmr_list_t *pList; if (ctrl == NULL || ctrl->label[0] == 0) {
int index, period;
tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;
if (handle == NULL) return NULL;
period = mseconds / pCtrl->resolution;
if (pthread_mutex_lock(&pCtrl->mutex) != 0)
tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno));
pObj = (tmr_obj_t *)tmrMemPoolMalloc(pCtrl->poolHandle);
if (pObj == NULL) {
tmrError("%s reach max number of timers:%d", pCtrl->label, pCtrl->maxNumOfTmrs);
pthread_mutex_unlock(&pCtrl->mutex);
return NULL; return NULL;
} }
pObj->cycle = period / pCtrl->numOfPeriods; tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t));
pObj->param1 = param1; if (timer == NULL) {
pObj->fp = fp; tmrError("failed to allocated memory for new timer object.");
pObj->timerId = pObj; return NULL;
pObj->pCtrl = pCtrl;
index = (period + pCtrl->periodsFromStart) % pCtrl->numOfPeriods;
int cindex = (pCtrl->periodsFromStart) % pCtrl->numOfPeriods;
pList = &(pCtrl->tmrList[index]);
pObj->index = index;
cNode = pList->head;
pNode = NULL;
while (cNode != NULL) {
if (cNode->cycle < pObj->cycle) {
pNode = cNode;
cNode = cNode->next;
} else {
break;
}
} }
pObj->next = cNode; return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
pObj->prev = pNode; }
if (cNode != NULL) { static void taosTimerLoopFunc(int signo) {
cNode->prev = pObj; int64_t now = taosGetTimestampMs();
for (int i = 0; i < tListLen(wheels); i++) {
// `expried` is a temporary expire list.
// expired timers are first add to this list, then move
// to expired queue as a batch to improve performance.
// note this list is used as a stack in this function.
tmr_obj_t* expired = NULL;
time_wheel_t* wheel = wheels + i;
while (now >= wheel->nextScanAt) {
pthread_mutex_lock(&wheel->mutex);
wheel->index = (wheel->index + 1) % wheel->size;
tmr_obj_t* timer = wheel->slots[wheel->index];
while (timer != NULL) {
tmr_obj_t* next = timer->next;
if (now < timer->expireAt) {
timer = next;
continue;
}
// remove from the wheel
if (timer->prev == NULL) {
wheel->slots[wheel->index] = next;
if (next != NULL) {
next->prev = NULL;
} }
if (pNode != NULL) {
pNode->next = pObj;
} else { } else {
pList->head = pObj; timer->prev->next = next;
if (next != NULL) {
next->prev = timer->prev;
} }
pList->count++;
pCtrl->numOfTmrs++;
if (pthread_mutex_unlock(&pCtrl->mutex) != 0)
tmrError("%s mutex unlock failed, reason:%s", pCtrl->label, strerror(errno));
tmrTrace("%s %p, timer started, fp:%p, tmr_h:%p, index:%d, total:%d cindex:%d", pCtrl->label, param1, fp, pObj, index,
pCtrl->numOfTmrs, cindex);
return (tmr_h)pObj;
}
void taosTmrStop(tmr_h timerId) {
tmr_obj_t * pObj;
tmr_list_t *pList;
tmr_ctrl_t *pCtrl;
pObj = (tmr_obj_t *)timerId;
if (pObj == NULL) return;
pCtrl = pObj->pCtrl;
if (pCtrl == NULL) return;
if (pthread_mutex_lock(&pCtrl->mutex) != 0)
tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno));
if (pObj->timerId == timerId) {
pList = &(pCtrl->tmrList[pObj->index]);
if (pObj->prev) {
pObj->prev->next = pObj->next;
} else {
pList->head = pObj->next;
} }
timer->wheel = tListLen(wheels);
if (pObj->next) { // add to temporary expire list
pObj->next->prev = pObj->prev; timer->next = expired;
timer->prev = NULL;
if (expired != NULL) {
expired->prev = timer;
} }
expired = timer;
pList->count--; timer = next;
pObj->timerId = NULL; }
pCtrl->numOfTmrs--; pthread_mutex_unlock(&wheel->mutex);
wheel->nextScanAt += wheel->resolution;
tmrTrace("%s %p, timer stopped, fp:%p, tmr_h:%p, total:%d", pCtrl->label, pObj->param1, pObj->fp, pObj,
pCtrl->numOfTmrs);
tmrMemPoolFree(pCtrl->poolHandle, (char *)(pObj));
} }
pthread_mutex_unlock(&pCtrl->mutex); addToExpired(expired);
}
} }
void taosTmrStopA(tmr_h *timerId) { static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
tmr_obj_t * pObj; bool reusable = false;
tmr_list_t *pList;
tmr_ctrl_t *pCtrl; if (state == TIMER_STATE_WAITING) {
if (removeFromWheel(timer)) {
pObj = *(tmr_obj_t **)timerId; removeTimer(timer->id);
if (pObj == NULL) return; // only safe to reuse the timer when timer is removed from the wheel.
// we cannot guarantee the thread safety of the timr in all other cases.
pCtrl = pObj->pCtrl; reusable = true;
if (pCtrl == NULL) return; }
const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] is cancelled.";
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
} else if (state != TIMER_STATE_EXPIRED) {
// timer already stopped or cancelled, has nothing to do in this case
} else if (timer->executedBy == taosGetPthreadId()) {
// taosTmrReset is called in the timer callback, should do nothing in this
// case to avoid dead lock. note taosTmrReset must be the last statement
// of the callback funtion, will be a bug otherwise.
} else {
assert(timer->executedBy != taosGetPthreadId());
if (pthread_mutex_lock(&pCtrl->mutex) != 0) const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] fired, waiting...";
tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno)); tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
if (pObj->timerId == pObj) { for (int i = 1; atomic_load_8(&timer->state) != TIMER_STATE_STOPPED; i++) {
pList = &(pCtrl->tmrList[pObj->index]); if (i % 1000 == 0) {
if (pObj->prev) { sched_yield();
pObj->prev->next = pObj->next;
} else {
pList->head = pObj->next;
} }
if (pObj->next) {
pObj->next->prev = pObj->prev;
} }
pList->count--; fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] stopped.";
pObj->timerId = NULL; tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
pCtrl->numOfTmrs--;
tmrTrace("%s %p, timer stopped atomiclly, fp:%p, tmr_h:%p, total:%d", pCtrl->label, pObj->param1, pObj->fp, pObj,
pCtrl->numOfTmrs);
tmrMemPoolFree(pCtrl->poolHandle, (char *)(pObj));
*(tmr_obj_t **)timerId = NULL;
} else {
tmrTrace("%s %p, timer stopped atomiclly, fp:%p, tmr_h:%p, total:%d", pCtrl->label, pObj->param1, pObj->fp, pObj,
pCtrl->numOfTmrs);
} }
pthread_mutex_unlock(&pCtrl->mutex); return reusable;
} }
void taosTmrReset(void (*fp)(void *, void *), int mseconds, void *param1, void *handle, tmr_h *pTmrId) { bool taosTmrStop(tmr_h timerId) {
tmr_obj_t * pObj, *cNode, *pNode; uintptr_t id = (uintptr_t)timerId;
tmr_list_t *pList;
int index, period;
tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;
if (handle == NULL) return; tmr_obj_t* timer = findTimer(id);
if (pTmrId == NULL) return; if (timer == NULL) {
tmrTrace("timer[id=%lld] does not exist", id);
return false;
}
period = mseconds / pCtrl->resolution; uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
if (pthread_mutex_lock(&pCtrl->mutex) != 0) doStopTimer(timer, state);
tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno)); timerDecRef(timer);
pObj = (tmr_obj_t *)(*pTmrId); return state == TIMER_STATE_WAITING;
}
if (pObj && pObj->timerId == *pTmrId) { bool taosTmrStopA(tmr_h* timerId) {
// exist, stop it first bool ret = taosTmrStop(*timerId);
pList = &(pCtrl->tmrList[pObj->index]); *timerId = NULL;
if (pObj->prev) { return ret;
pObj->prev->next = pObj->next; }
} else {
pList->head = pObj->next;
}
if (pObj->next) { bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, tmr_h* pTmrId) {
pObj->next->prev = pObj->prev; tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
if (ctrl == NULL || ctrl->label[0] == 0) {
return NULL;
} }
pList->count--; uintptr_t id = (uintptr_t)*pTmrId;
pObj->timerId = NULL; bool stopped = false;
pCtrl->numOfTmrs--; tmr_obj_t* timer = findTimer(id);
if (timer == NULL) {
tmrTrace("timer[id=%lld] does not exist", id);
} else { } else {
// timer not there, or already expired uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED);
pObj = (tmr_obj_t *)tmrMemPoolMalloc(pCtrl->poolHandle); if (!doStopTimer(timer, state)) {
*pTmrId = pObj; timerDecRef(timer);
timer = NULL;
if (pObj == NULL) {
tmrError("%s failed to allocate timer, max:%d allocated:%d", pCtrl->label, pCtrl->maxNumOfTmrs, pCtrl->numOfTmrs);
pthread_mutex_unlock(&pCtrl->mutex);
return;
} }
stopped = state == TIMER_STATE_WAITING;
} }
pObj->cycle = period / pCtrl->numOfPeriods; if (timer == NULL) {
pObj->param1 = param1; *pTmrId = taosTmrStart(fp, mseconds, param, handle);
pObj->fp = fp; return stopped;
pObj->timerId = pObj;
pObj->pCtrl = pCtrl;
index = (period + pCtrl->periodsFromStart) % pCtrl->numOfPeriods;
pList = &(pCtrl->tmrList[index]);
pObj->index = index;
cNode = pList->head;
pNode = NULL;
while (cNode != NULL) {
if (cNode->cycle < pObj->cycle) {
pNode = cNode;
cNode = cNode->next;
} else {
break;
}
} }
pObj->next = cNode; tmrTrace("timer[id=%lld] is reused", timer->id);
pObj->prev = pNode;
if (cNode != NULL) { // wait until there's no other reference to this timer,
cNode->prev = pObj; // so that we can reuse this timer safely.
for (int i = 1; atomic_load_8(&timer->refCount) > 1; ++i) {
if (i % 1000 == 0) {
sched_yield();
} }
if (pNode != NULL) {
pNode->next = pObj;
} else {
pList->head = pObj;
} }
pList->count++; assert(timer->refCount == 1);
pCtrl->numOfTmrs++; memset(timer, 0, sizeof(*timer));
*pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl);
if (pthread_mutex_unlock(&pCtrl->mutex) != 0)
tmrError("%s mutex unlock failed, reason:%s", pCtrl->label, strerror(errno));
tmrTrace("%s %p, timer is reset, fp:%p, tmr_h:%p, index:%d, total:%d numOfFree:%d", pCtrl->label, param1, fp, pObj,
index, pCtrl->numOfTmrs, ((pool_t *)pCtrl->poolHandle)->numOfFree);
return; return stopped;
} }
void taosTmrList(void *handle) { static void taosTmrModuleInit(void) {
int i; for (int i = 0; i < tListLen(tmrCtrls) - 1; ++i) {
tmr_list_t *pList; tmr_ctrl_t* ctrl = tmrCtrls + i;
tmr_obj_t * pObj; ctrl->next = ctrl + 1;
tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;
for (i = 0; i < pCtrl->numOfPeriods; ++i) {
pList = &(pCtrl->tmrList[i]);
pObj = pList->head;
if (!pObj) continue;
printf("\nindex=%d count:%d\n", i, pList->count);
while (pObj) {
pObj = pObj->next;
} }
} unusedTmrCtrl = tmrCtrls;
}
mpool_h tmrMemPoolInit(int numOfBlock, int blockSize) { pthread_mutex_init(&tmrCtrlMutex, NULL);
int i;
pool_t *pool_p;
if (numOfBlock <= 1 || blockSize <= 1) { int64_t now = taosGetTimestampMs();
tmrError("invalid parameter in memPoolInit\n"); for (int i = 0; i < tListLen(wheels); i++) {
return NULL; time_wheel_t* wheel = wheels + i;
if (pthread_mutex_init(&wheel->mutex, NULL) != 0) {
tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno));
return;
} }
wheel->nextScanAt = now + wheel->resolution;
pool_p = (pool_t *)malloc(sizeof(pool_t)); wheel->index = 0;
if (pool_p == NULL) { wheel->slots = (tmr_obj_t**)calloc(wheel->size, sizeof(tmr_obj_t*));
tmrError("mempool malloc failed\n"); if (wheel->slots == NULL) {
return NULL; tmrError("failed to allocate wheel slots");
} else { return;
memset(pool_p, 0, sizeof(pool_t));
} }
timerMap.size += wheel->size;
pool_p->blockSize = blockSize;
pool_p->numOfBlock = numOfBlock;
pool_p->pool = (char *)malloc(blockSize * numOfBlock);
pool_p->freeList = (int *)malloc(sizeof(int) * numOfBlock);
if (pool_p->pool == NULL || pool_p->freeList == NULL) {
tmrError("failed to allocate memory\n");
tfree(pool_p->freeList);
tfree(pool_p->pool);
free(pool_p);
return NULL;
} }
memset(pool_p->pool, 0, blockSize * numOfBlock);
for (i = 0; i < pool_p->numOfBlock; ++i) pool_p->freeList[i] = i; timerMap.count = 0;
timerMap.slots = (timer_list_t*)calloc(timerMap.size, sizeof(timer_list_t));
if (timerMap.slots == NULL) {
tmrError("failed to allocate hash map");
return;
}
pool_p->first = 0; tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");
pool_p->numOfFree = pool_p->numOfBlock; taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
return (mpool_h)pool_p; tmrTrace("timer module is initialized, number of threads: %d", taosTmrThreads);
} }
char *tmrMemPoolMalloc(mpool_h handle) { void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) {
char * pos = NULL; pthread_once(&tmrModuleInit, taosTmrModuleInit);
pool_t *pool_p = (pool_t *)handle;
if (pool_p->numOfFree <= 0 || pool_p->numOfFree > pool_p->numOfBlock) { pthread_mutex_lock(&tmrCtrlMutex);
tmrError("mempool: out of memory, numOfFree:%d, numOfBlock:%d", pool_p->numOfFree, pool_p->numOfBlock); tmr_ctrl_t* ctrl = unusedTmrCtrl;
} else { if (ctrl != NULL) {
pos = pool_p->pool + pool_p->blockSize * (pool_p->freeList[pool_p->first]); unusedTmrCtrl = ctrl->next;
pool_p->first++;
pool_p->first = pool_p->first % pool_p->numOfBlock;
pool_p->numOfFree--;
} }
pthread_mutex_unlock(&tmrCtrlMutex);
return pos; if (ctrl == NULL) {
} tmrError("too many timer controllers, failed to create timer controller[label=%s].", label);
return NULL;
void tmrMemPoolFree(mpool_h handle, char *pMem) {
int index;
pool_t *pool_p = (pool_t *)handle;
if (pMem == NULL) return;
index = (int)(pMem - pool_p->pool) / pool_p->blockSize;
if (index < 0 || index >= pool_p->numOfBlock) {
tmrError("tmr mempool: error, invalid address:%p\n", pMem);
} else {
memset(pMem, 0, pool_p->blockSize);
pool_p->freeList[(pool_p->first + pool_p->numOfFree) % pool_p->numOfBlock] = index;
pool_p->numOfFree++;
} }
strncpy(ctrl->label, label, sizeof(ctrl->label));
ctrl->label[sizeof(ctrl->label) - 1] = 0;
tmrTrace("timer controller[label=%s] is initialized.", label);
return ctrl;
} }
void tmrMemPoolCleanUp(mpool_h handle) { void taosTmrCleanUp(void* handle) {
pool_t *pool_p = (pool_t *)handle; tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle;
if (pool_p == NULL) return; assert(ctrl != NULL && ctrl->label[0] != 0);
tmrTrace("timer controller[label=%s] is cleaned up.", ctrl->label);
ctrl->label[0] = 0;
if (pool_p->pool) free(pool_p->pool); pthread_mutex_lock(&tmrCtrlMutex);
if (pool_p->freeList) free(pool_p->freeList); ctrl->next = unusedTmrCtrl;
memset(&pool_p, 0, sizeof(pool_p)); unusedTmrCtrl = ctrl;
free(pool_p); pthread_mutex_unlock(&tmrCtrlMutex);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册