提交 cb52b7a7 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

......@@ -274,7 +274,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF)
IF (TD_LINUX)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
ELSE()
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
ENDIF()
......
......@@ -32,25 +32,22 @@ TDengine's JDBC driver implementation is as consistent as possible with the rela
Native connections are supported on the same platforms as the TDengine client driver.
REST connection supports all platforms that can run Java.
## Version support
Please refer to [version support list](/reference/connector#version-support)
## Recent update logs
| taos-jdbcdriver version | major changes |
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: |
| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket |
| 3.2.0 | This version has been deprecated |
| 3.1.0 | JDBC REST connection supports subscription over WebSocket |
| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment |
| 3.0.0 | Support for TDengine 3.0 |
| 2.0.42 | fix wasNull interface return value in WebSocket connection |
| 2.0.41 | fix decode method of username and password in REST connection |
| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters |
| 2.0.38 | JDBC REST connections add bulk pull function |
| 2.0.37 | Support json tags |
| 2.0.36 | Support schemaless writing |
| taos-jdbcdriver version | major changes | TDengine version |
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
| 3.2.1 | subscription add seek function | 3.0.5.0 or later |
| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later |
| 3.2.0 | This version has been deprecated | - |
| 3.1.0 | JDBC REST connection supports subscription over WebSocket | - |
| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - |
| 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later |
| 2.0.42 | fix wasNull interface return value in WebSocket connection | - |
| 2.0.41 | fix decode method of username and password in REST connection | - |
| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - |
| 2.0.38 | JDBC REST connections add bulk pull function | - |
| 2.0.37 | Support json tags | - |
| 2.0.36 | Support schemaless writing | - |
**Note**: adding `batchfetch` to the REST connection and setting it to true will enable the WebSocket connection.
......@@ -102,6 +99,8 @@ For specific error codes, please refer to.
| 0x2319 | user is required | The user name information is missing when creating the connection |
| 0x231a | password is required | Password information is missing when creating a connection |
| 0x231c | httpEntity is null, sql: | Execution exception occurred during the REST connection |
| 0x231d | can't create connection with server within | Increase the connection time by adding the httpConnectTimeout parameter, or check the connection to the taos adapter. |
| 0x231e | failed to complete the task within the specified time | Increase the execution time by adding the messageWaitTimeout parameter, or check the connection to the taos adapter. |
| 0x2350 | unknown error | Unknown exception, please return to the developer on github. |
| 0x2352 | Unsupported encoding | An unsupported character encoding set is specified under the native Connection. |
| 0x2353 | internal error of database, please see taoslog for more details | An error occurs when the prepare statement is executed on the native connection. Check the taos log to locate the fault. |
......@@ -117,8 +116,8 @@ For specific error codes, please refer to.
| 0x2376 | failed to set consumer topic, topic name is empty | During data subscription creation, the subscription topic name is empty. Check that the specified topic name is correct. |
| 0x2377 | consumer reference has been destroyed | The subscription data transfer channel has been closed. Please check the connection to TDengine. |
| 0x2378 | consumer create error | Failed to create a data subscription. Check the taos log according to the error message to locate the fault. |
| - | can't create connection with server within | Increase the connection time by adding the httpConnectTimeout parameter, or check the connection to the taos adapter. |
| - | failed to complete the task within the specified time | Increase the execution time by adding the messageWaitTimeout parameter, or check the connection to the taos adapter. |
| 0x2379 | seek offset must not be a negative number | The seek interface parameter cannot be negative. Use the correct parameter |
| 0x237a | vGroup not found in result set | subscription is not bound to the VGroup due to the rebalance mechanism |
- [TDengine Java Connector](https://github.com/taosdata/taos-connector-jdbc/blob/main/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java)
<!-- - [TDengine_ERROR_CODE](../error-code) -->
......@@ -169,7 +168,7 @@ Add following dependency in the `pom.xml` file of your Maven project:
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.1</version>
<version>3.2.2</version>
</dependency>
```
......@@ -913,14 +912,15 @@ public class SchemalessWsTest {
public static void main(String[] args) throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata&batchfetch=true";
Connection connection = DriverManager.getConnection(url);
init(connection);
SchemalessWriter writer = new SchemalessWriter(connection, "test_ws_schemaless");
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
writer.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
writer.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS);
System.exit(0);
try(Connection connection = DriverManager.getConnection(url)){
init(connection);
try(SchemalessWriter writer = new SchemalessWriter(connection, "test_ws_schemaless")){
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
writer.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
writer.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS);
}
}
}
private static void init(Connection connection) throws SQLException {
......@@ -991,6 +991,17 @@ while(true) {
`poll` obtains one message each time it is run.
#### Assignment subscription Offset
```
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
void seek(TopicPartition partition, long offset) throws SQLException;
```
#### Close subscriptions
```java
......
此差异已折叠。
......@@ -5,7 +5,7 @@
#spring.datasource.password=taosdata
# datasource config - JDBC-RESTful
spring.datasource.driver-class-name=com.taosdata.jdbc.rs.RestfulDriver
spring.datasource.url=jdbc:TAOS-RS://localhost:6041/test?timezone=UTC-8&charset=UTF-8&locale=en_US.UTF-8
spring.datasource.url=jdbc:TAOS-RS://localhost:6041/test
spring.datasource.username=root
spring.datasource.password=taosdata
spring.datasource.druid.initial-size=5
......
......@@ -42,27 +42,27 @@ IF (TD_LINUX)
)
target_link_libraries(tmq
taos_static
taos
)
target_link_libraries(stream_demo
taos_static
taos
)
target_link_libraries(schemaless
taos_static
taos
)
target_link_libraries(prepare
taos_static
taos
)
target_link_libraries(demo
taos_static
taos
)
target_link_libraries(asyncdemo
taos_static
taos
)
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
......
......@@ -163,6 +163,7 @@ typedef struct {
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
int64_t streamBackendRid;
} SStreamState;
typedef struct SFunctionStateStore {
......
......@@ -345,7 +345,6 @@ typedef struct SStreamMeta {
SRWLatch lock;
int32_t walScanCounter;
void* streamBackend;
int32_t streamBackendId;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
} SStreamMeta;
......
......@@ -22,21 +22,20 @@ extern "C" {
// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following sectio
// #if !defined(WINDOWS)
#if !defined(WINDOWS)
// #ifndef ALLOW_FORBID_FUNC
// #define malloc MALLOC_FUNC_TAOS_FORBID
// #define calloc CALLOC_FUNC_TAOS_FORBID
// #define realloc REALLOC_FUNC_TAOS_FORBID
// #define free FREE_FUNC_TAOS_FORBID
// #ifdef strdup
// #undef strdup
// #define strdup STRDUP_FUNC_TAOS_FORBID
// #endif
// #endif // ifndef ALLOW_FORBID_FUNC
// #endif // if !defined(WINDOWS)
#ifndef ALLOW_FORBID_FUNC
#define malloc MALLOC_FUNC_TAOS_FORBID
#define calloc CALLOC_FUNC_TAOS_FORBID
#define realloc REALLOC_FUNC_TAOS_FORBID
#define free FREE_FUNC_TAOS_FORBID
#ifdef strdup
#undef strdup
#define strdup STRDUP_FUNC_TAOS_FORBID
#endif
#endif // ifndef ALLOW_FORBID_FUNC
#endif // if !defined(WINDOWS)
// // #define taosMemoryFree malloc
// #define taosMemoryMalloc malloc
// #define taosMemoryCalloc calloc
// #define taosMemoryRealloc realloc
......
......@@ -80,5 +80,4 @@ fi
# there can not libtaos.so*, otherwise ln -s error
${csudo}rm -f ${install_main_dir}/driver/libtaos.* || :
[ -f ${install_main_dir}/driver/librocksdb.* ] && ${csudo}rm -f ${install_main_dir}/driver/librocksdb.* || :
[ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}rm -f ${install_main_dir}/driver/libtaosws.so || :
......@@ -40,7 +40,6 @@ else
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || :
[ -f ${lib_link_dir}/librocksdb.* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb.* || :
[ -f ${lib_link_dir}/libtaosws.so ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.so || :
${csudo}rm -f ${log_link_dir} || :
......
......@@ -31,7 +31,6 @@ cd ${pkg_dir}
libfile="libtaos.so.${tdengine_ver}"
wslibfile="libtaosws.so"
rocksdblib="librocksdb.so.8"
# create install dir
install_home_path="/usr/local/taos"
......@@ -95,7 +94,6 @@ fi
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
[ -f ${compile_dir}/build/lib/${rocksdblib} ] && cp ${compile_dir}/build/lib/${rocksdblib} ${pkg_dir}${install_home_path}/driver ||:
[ -f ${compile_dir}/build/lib/${wslibfile} ] && cp ${compile_dir}/build/lib/${wslibfile} ${pkg_dir}${install_home_path}/driver ||:
cp ${compile_dir}/../include/client/taos.h ${pkg_dir}${install_home_path}/include
cp ${compile_dir}/../include/common/taosdef.h ${pkg_dir}${install_home_path}/include
......
......@@ -45,7 +45,6 @@ echo buildroot: %{buildroot}
libfile="libtaos.so.%{_version}"
wslibfile="libtaosws.so"
rocksdblib="librocksdb.so.8"
# create install path, and cp file
mkdir -p %{buildroot}%{homepath}/bin
......@@ -93,7 +92,6 @@ if [ -f %{_compiledir}/build/bin/taosadapter ]; then
fi
cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver
[ -f %{_compiledir}/build/lib/${wslibfile} ] && cp %{_compiledir}/build/lib/${wslibfile} %{buildroot}%{homepath}/driver ||:
[ -f %{_compiledir}/build/lib/${rocksdblib} ] && cp %{_compiledir}/build/lib/${rocksdblib} %{buildroot}%{homepath}/driver ||:
cp %{_compiledir}/../include/client/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/common/taosdef.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../include/util/taoserror.h %{buildroot}%{homepath}/include
......@@ -176,7 +174,6 @@ fi
# there can not libtaos.so*, otherwise ln -s error
${csudo}rm -f %{homepath}/driver/libtaos* || :
${csudo}rm -f %{homepath}/driver/librocksdb* || :
#Scripts executed after installation
%post
......@@ -222,7 +219,6 @@ if [ $1 -eq 0 ];then
${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${lib_link_dir}/librocksdb.* || :
${csudo}rm -f ${log_link_dir} || :
${csudo}rm -f ${data_link_dir} || :
......
......@@ -250,30 +250,18 @@ function install_lib() {
# Remove links
${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${lib64_link_dir}/libtaos.* || :
${csudo}rm -f ${lib_link_dir}/librocksdb.* || :
${csudo}rm -f ${lib64_link_dir}/librocksdb.* || :
#${csudo}rm -rf ${v15_java_app_dir} || :
${csudo}cp -rf ${script_dir}/driver/* ${install_main_dir}/driver && ${csudo}chmod 777 ${install_main_dir}/driver/*
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib_link_dir}/librocksdb.so.8
${csudo}ln -sf ${lib_link_dir}/librocksdb.so.8 ${lib_link_dir}/librocksdb.so
${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib_link_dir}/librocksdb.so.8
${csudo}ln -sf ${lib_link_dir}/librocksdb.so.8 ${lib_link_dir}/librocksdb.so
[ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || :
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 || :
${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || :
${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib64_link_dir}/librocksdb.so.8 || :
${csudo}ln -sf ${lib64_link_dir}/librocksdb.so.8 ${lib64_link_dir}/librocksdb.so || :
[ -f ${install_main_dir}/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || :
fi
......
......@@ -111,11 +111,9 @@ fi
if [ "$osType" == "Darwin" ]; then
lib_files="${build_dir}/lib/libtaos.${version}.dylib"
wslib_files="${build_dir}/lib/libtaosws.dylib"
rocksdb_lib_files="${build_dir}/lib/librocksdb.dylib.8.1.1"
else
lib_files="${build_dir}/lib/libtaos.so.${version}"
wslib_files="${build_dir}/lib/libtaosws.so"
rocksdb_lib_files="${build_dir}/lib/librocksdb.so.8.1.1"
fi
header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h"
......@@ -338,7 +336,6 @@ fi
# Copy driver
mkdir -p ${install_dir}/driver && cp ${lib_files} ${install_dir}/driver && echo "${versionComp}" >${install_dir}/driver/vercomp.txt
[ -f ${wslib_files} ] && cp ${wslib_files} ${install_dir}/driver || :
[ -f ${rocksdb_lib_files} ] && cp ${rocksdb_lib_files} ${install_dir}/driver || :
# Copy connector
if [ "$verMode" == "cluster" ]; then
......
......@@ -202,19 +202,10 @@ function install_lib() {
log_print "start install lib from ${lib_dir} to ${lib_link_dir}"
${csudo}rm -f ${lib_link_dir}/libtaos* || :
${csudo}rm -f ${lib64_link_dir}/libtaos* || :
#rocksdb
[ -f ${lib_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb* || :
[ -f ${lib64_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib64_link_dir}/librocksdb* || :
#rocksdb
[ -f ${lib_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb* || :
[ -f ${lib64_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib64_link_dir}/librocksdb* || :
[ -f ${lib_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.${lib_file_ext} || :
[ -f ${lib64_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.${lib_file_ext} || :
${csudo}ln -s ${lib_dir}/librocksdb.* ${lib_link_dir}/librocksdb.${lib_file_ext_1} 2>>${install_log_path} || return 1
${csudo}ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1
${csudo}ln -s ${lib_link_dir}/libtaos.${lib_file_ext_1} ${lib_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1
......@@ -223,7 +214,6 @@ function install_lib() {
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.${lib_file_ext} ]]; then
${csudo}ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1
${csudo}ln -s ${lib64_link_dir}/libtaos.${lib_file_ext_1} ${lib64_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1
${csudo}ln -s ${lib_dir}/librocksdb.* ${lib64_link_dir}/librocksdb.${lib_file_ext_1} 2>>${install_log_path} || return 1
[ -f ${lib_dir}/libtaosws.${lib_file_ext} ] && ${csudo}ln -sf ${lib_dir}/libtaosws.${lib_file_ext} ${lib64_link_dir}/libtaosws.${lib_file_ext} 2>>${install_log_path}
fi
......
......@@ -142,11 +142,9 @@ function clean_local_bin() {
function clean_lib() {
# Remove link
${csudo}rm -f ${lib_link_dir}/libtaos.* || :
${csudo}rm -f ${lib_link_dir}/librocksdb.* || :
[ -f ${lib_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.* || :
${csudo}rm -f ${lib64_link_dir}/libtaos.* || :
${csudo}rm -f ${lib64_link_dir}/librocksdb.* || :
[ -f ${lib64_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.* || :
#${csudo}rm -rf ${v15_java_app_dir} || :
......
......@@ -115,7 +115,7 @@ target_link_libraries(
# PUBLIC bdb
# PUBLIC scalar
PUBLIC rocksdb-shared
PUBLIC rocksdb
PUBLIC transport
PUBLIC stream
PUBLIC index
......
......@@ -647,6 +647,8 @@ uint64_t calcGroupId(char* pData, int32_t len) {
// NOTE: only extract the initial 8 bytes of the final MD5 digest
uint64_t id = 0;
memcpy(&id, context.digest, sizeof(uint64_t));
if (0 == id)
memcpy(&id, context.digest + 8, sizeof(uint64_t));
return id;
}
......
......@@ -319,6 +319,11 @@ void destroyMergeJoinOperator(void* param) {
}
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks);
taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations);
taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
taosMemoryFreeClear(param);
}
......
......@@ -213,6 +213,8 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
} else {
if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
setOperatorCompleted(pOperator);
} else if (limitReached && groupId == 0) {
setOperatorCompleted(pOperator);
}
}
......
......@@ -11,7 +11,7 @@ if(${BUILD_WITH_ROCKSDB})
IF (TD_LINUX)
target_link_libraries(
stream
PUBLIC rocksdb-shared tdb
PUBLIC rocksdb tdb
PRIVATE os util transport qcom executor wal index
)
ELSE()
......
......@@ -36,8 +36,9 @@ static SStreamGlobalEnv streamEnv;
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
......@@ -53,6 +54,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
extern int32_t streamBackendId;
#ifdef __cplusplus
}
#endif
......
......@@ -16,7 +16,9 @@
#include "streamBackendRocksdb.h"
#include "executor.h"
#include "query.h"
#include "streamInc.h"
#include "tcommon.h"
#include "tref.h"
typedef struct SCompactFilteFactory {
void* status;
......@@ -79,8 +81,8 @@ const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name);
void* streamBackendInit(const char* path) {
qDebug("init stream backend");
SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle));
qDebug("start to init stream backend at %s", path);
SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle));
pHandle->list = tdListNew(sizeof(SCfComparator));
taosThreadMutexInit(&pHandle->mutex, NULL);
taosThreadMutexInit(&pHandle->cfMutex, NULL);
......@@ -119,6 +121,7 @@ void* streamBackendInit(const char* path) {
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", path, err);
taosMemoryFreeClear(err);
goto _EXIT;
}
} else {
/*
......@@ -129,6 +132,7 @@ void* streamBackendInit(const char* path) {
if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf);
}
qDebug("succ to init stream backend at %s, backend:%p", path, pHandle);
return (void*)pHandle;
_EXIT:
......@@ -140,7 +144,8 @@ _EXIT:
taosHashCleanup(pHandle->cfInst);
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list);
free(pHandle);
taosMemoryFree(pHandle);
qDebug("failed to init stream backend at %s", path);
return NULL;
}
void streamBackendCleanup(void* arg) {
......@@ -168,19 +173,20 @@ void streamBackendCleanup(void* arg) {
rocksdb_env_destroy(pHandle->env);
rocksdb_cache_destroy(pHandle->cache);
taosThreadMutexDestroy(&pHandle->mutex);
SListNode* head = tdListPopHead(pHandle->list);
while (head != NULL) {
streamStateDestroyCompar(head->data);
taosMemoryFree(head);
head = tdListPopHead(pHandle->list);
}
// rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
tdListFree(pHandle->list);
taosThreadMutexDestroy(&pHandle->mutex);
taosThreadMutexDestroy(&pHandle->cfMutex);
taosMemoryFree(pHandle);
qDebug("destroy stream backend backend:%p", pHandle);
return;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) {
......@@ -803,7 +809,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
return 0;
}
int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendHandle* handle = backend;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
......@@ -866,7 +873,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId);
qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId);
return 0;
}
......@@ -882,8 +889,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadMutexUnlock(&pHandle->cfMutex);
char* status[] = {"close", "drop"};
qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId,
pState->taskId);
qInfo("start to close %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle,
pState->streamId, pState->taskId);
if (pState->pTdbState->rocksdb == NULL) {
return;
}
......@@ -938,6 +945,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadRwlockDestroy(&pState->pTdbState->rwLock);
pState->pTdbState->rocksdb = NULL;
taosReleaseRef(streamBackendId, pState->streamBackendRid);
}
void streamStateDestroyCompar(void* arg) {
SCfComparator* comp = (SCfComparator*)arg;
......
......@@ -20,7 +20,7 @@
#include "ttimer.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
static int32_t streamBackendId = 0;
int32_t streamBackendId = 0;
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
......@@ -79,7 +79,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->vgId = vgId;
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
pMeta->streamBackendId = streamBackendId;
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "state");
......
......@@ -106,7 +106,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
}
SStreamTask* pStreamTask = pTask;
char statePath[1024];
char statePath[1024];
if (!specPath) {
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
} else {
......@@ -119,10 +119,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
#ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta;
taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid);
pState->streamBackendRid = pMeta->streamBackendRid;
int code = streamStateOpenBackend(pMeta->streamBackend, pState);
if (code == -1) {
taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid);
taosReleaseRef(streamBackendId, pMeta->streamBackendRid);
taosMemoryFree(pState);
pState = NULL;
}
......@@ -222,9 +222,7 @@ _err:
void streamStateClose(SStreamState* pState, bool remove) {
SStreamTask* pTask = pState->pTdbState->pOwner;
#ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
streamStateDestroy(pState, remove);
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
#else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
......@@ -278,10 +276,10 @@ int32_t streamStateCommit(SStreamState* pState) {
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
return code;
......@@ -291,10 +289,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
}
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize;
return code;
......
......@@ -419,7 +419,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
if (code != 0 || len == 0 || val == NULL) {
return TSDB_CODE_FAILED;
}
memcpy(val, buf, len);
memcpy(buf, val, len);
buf[len] = 0;
maxCheckPointId = atol((char*)buf);
taosMemoryFree(val);
......@@ -433,7 +433,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
if (code != 0) {
return TSDB_CODE_FAILED;
}
memcpy(val, buf, len);
memcpy(buf, val, len);
buf[len] = 0;
taosMemoryFree(val);
......
......@@ -9,35 +9,35 @@ add_executable(get_db_name_test get_db_name_test.c)
add_executable(tmq_offset tmqOffset.c)
target_link_libraries(
tmq_offset
PUBLIC taos_static
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
create_table
PUBLIC taos_static
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_demo
PUBLIC taos_static
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_sim
PUBLIC taos_static
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_taosx_ci
PUBLIC taos_static
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
......@@ -45,7 +45,7 @@ target_link_libraries(
target_link_libraries(
write_raw_block_test
PUBLIC taos_static
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
......@@ -53,7 +53,7 @@ target_link_libraries(
target_link_libraries(
sml_test
PUBLIC taos_static
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
......@@ -61,7 +61,7 @@ target_link_libraries(
target_link_libraries(
get_db_name_test
PUBLIC taos_static
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册