提交 2d41a429 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'main' of https://github.com/taosdata/TDengine into fix/TS-3728

......@@ -18,7 +18,7 @@ The full package of TDengine includes the TDengine Server (`taosd`), TDengine Cl
The standard server installation package includes `taos`, `taosd`, `taosAdapter`, `taosBenchmark`, and sample code. You can also download the Lite package that includes only `taosd` and the C/C++ connector.
The TDengine Community Edition is released as Deb and RPM packages. The Deb package can be installed on Debian, Ubuntu, and derivative systems. The RPM package can be installed on CentOS, RHEL, SUSE, and derivative systems. A .tar.gz package is also provided for enterprise customers, and you can install TDengine over `apt-get` as well. The .tar.tz package includes `taosdump` and the TDinsight installation script. If you want to use these utilities with the Deb or RPM package, download and install taosTools separately. TDengine can also be installed on x64 Windows and x64/m1 macOS.
TDengine OSS is released as Deb and RPM packages. The Deb package can be installed on Debian, Ubuntu, and derivative systems. The RPM package can be installed on CentOS, RHEL, SUSE, and derivative systems. A .tar.gz package is also provided for enterprise customers, and you can install TDengine over `apt-get` as well. The .tar.tz package includes `taosdump` and the TDinsight installation script. If you want to use these utilities with the Deb or RPM package, download and install taosTools separately. TDengine can also be installed on x64 Windows and x64/m1 macOS.
## Operating environment requirements
In the Linux system, the minimum requirements for the operating environment are as follows:
......
......@@ -403,7 +403,7 @@ In this section we will demonstrate 5 examples of developing UDF in Python langu
In the guide, some debugging skills of using Python UDF will be explained too.
We assume you are using Linux system and already have TDengine 3.0.4.0+ and Python 3.x.
We assume you are using Linux system and already have TDengine 3.0.4.0+ and Python 3.7+.
Note:**You can't use print() function to output log inside a UDF, you have to write the log to a specific file or use logging module of Python.**
......
......@@ -58,7 +58,7 @@ database_option: {
- WAL_FSYNC_PERIOD: specifies the interval (in milliseconds) at which data is written from the WAL to disk. This parameter takes effect only when the WAL parameter is set to 2. The default value is 3000. Enter a value between 0 and 180000. The value 0 indicates that incoming data is immediately written to disk.
- MAXROWS: specifies the maximum number of rows recorded in a block. The default value is 4096.
- MINROWS: specifies the minimum number of rows recorded in a block. The default value is 100.
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. The Enterprise Edition supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 <= keep 1 <= keep 2, e.g. KEEP 100h,100d,3650d) are supported; the Community Edition does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP).
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. TDengine Enterprise supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 <= keep 1 <= keep 2, e.g. KEEP 100h,100d,3650d) are supported; TDengine OSS does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP).
- PAGES: specifies the number of pages in the metadata storage engine cache on each vnode. Enter a value greater than or equal to 64. The default value is 256. The space occupied by metadata storage on each vnode is equal to the product of the values of the PAGESIZE and PAGES parameters. The space occupied by default is 1 MB.
- PAGESIZE: specifies the size (in KB) of each page in the metadata storage engine cache on each vnode. The default value is 4. Enter a value between 1 and 16384.
- PRECISION: specifies the precision at which a database records timestamps. Enter ms for milliseconds, us for microseconds, or ns for nanoseconds. The default value is ms.
......
......@@ -214,19 +214,6 @@ The data of tdinsight dashboard is stored in `log` database (default. You can ch
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### logs table
`logs` table contains login information records.
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|level|VARCHAR||log level|
|content|NCHAR||log content|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### log\_summary table
`log_summary` table contains log summary information records.
......
......@@ -648,12 +648,12 @@ stmt.execute()?;
//stmt.execute()?;
```
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs).
For a working example, see [GitHub](https://github.com/taosdata/taos-connector-rust/blob/main/taos/examples/bind.rs).
For information about other structure APIs, see the [Rust documentation](https://docs.rs/taos).
[taos]: https://github.com/taosdata/rust-connector-taos
[taos]: https://github.com/taosdata/taos-connector-rust
[r2d2]: https://crates.io/crates/r2d2
[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html
......
......@@ -1007,13 +1007,12 @@ consumer.close()
### Other sample programs
| Example program links | Example program content |
| ------------------------------------------------------------------------------------------------------------- | ------------------- ---- |
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding,
bind multiple rows at once |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | bind_row.py
|-----------------------|-------------------------|
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, bind multiple rows at once |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | parameter binding, bind one row at once |
| [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB line protocol writing |
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | Use JSON type tags |
| [tmq.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq.py) | TMQ subscription |
| [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py) | TMQ subscription |
## Other notes
......
......@@ -398,7 +398,7 @@ def finish(buf: bytes) -> output_type:
3. 定义一个标量函数,输入一个时间戳,输出距离这个时间最近的下一个周日。完成这个函数要用到第三方库 moment。我们在这个示例中讲解使用第三方库的注意事项。
4. 定义一个聚合函数,计算某一列最大值和最小值的差, 也就是实现 TDengien 内置的 spread 函数。
同时也包含大量实用的 debug 技巧。
本文假设你用的是 Linux 系统,且已安装好了 TDengine 3.0.4.0+ 和 Python 3.x
本文假设你用的是 Linux 系统,且已安装好了 TDengine 3.0.4.0+ 和 Python 3.7+
注意:**UDF 内无法通过 print 函数输出日志,需要自己写文件或用 python 内置的 logging 库写文件**
......
......@@ -210,19 +210,6 @@ TDinsight dashboard 数据来源于 log 库(存放监控数据的默认db,
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### logs 表
`logs` 表记录登录信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|level|VARCHAR||log level|
|content|NCHAR||log content,长度不超过1024字节|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### log\_summary 表
`log_summary` 记录日志统计信息。
......
......@@ -112,7 +112,7 @@ TDengine 3.0 采用 hash 一致性算法,确定每张数据表所在的 vnode
### 数据分区
TDengine 除 vnode 分片之外,还对时序数据按照时间段进行分区。每个数据文件只包含一个时间段的时序数据,时间段的长度由 DB 的配置参数 days 决定。这种按时间段分区的方法还便于高效实现数据的保留策略,只要数据文件超过规定的天数(系统配置参数 keep),将被自动删除。而且不同的时间段可以存放于不同的路径和存储介质,以便于大数据的冷热管理,实现多级存储。
TDengine 除 vnode 分片之外,还对时序数据按照时间段进行分区。每个数据文件只包含一个时间段的时序数据,时间段的长度由 DB 的配置参数 duration 决定。这种按时间段分区的方法还便于高效实现数据的保留策略,只要数据文件超过规定的天数(系统配置参数 keep),将被自动删除。而且不同的时间段可以存放于不同的路径和存储介质,以便于大数据的冷热管理,实现多级存储。
总的来说,**TDengine 是通过 vnode 以及时间两个维度,对大数据进行切分**,便于并行高效的管理,实现水平扩展。
......
......@@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
int32_t *numOfAssignment);
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
......
......@@ -312,7 +312,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
......
......@@ -613,12 +613,6 @@ function install_examples() {
fi
}
function install_web() {
if [ -d "${script_dir}/share" ]; then
${csudo}cp -rf ${script_dir}/share/* ${install_main_dir}/share > /dev/null 2>&1 ||:
fi
}
function clean_service_on_sysvinit() {
if ps aux | grep -v grep | grep ${serverName2} &>/dev/null; then
......@@ -894,7 +888,6 @@ function updateProduct() {
fi
install_examples
install_web
if [ -z $1 ]; then
install_bin
install_service
......@@ -907,20 +900,22 @@ function updateProduct() {
echo
echo -e "${GREEN_DARK}To configure ${productName2} ${NC}: edit ${cfg_install_dir}/${configFile2}"
[ -f ${configDir}/${clientName2}adapter.toml ] && [ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To configure ${clientName2} Adapter ${NC}: edit ${configDir}/${clientName2}adapter.toml"
echo -e "${GREEN_DARK}To configure ${clientName2}Adapter ${NC}: edit ${configDir}/${clientName2}adapter.toml"
if ((${service_mod} == 0)); then
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${csudo}systemctl start ${serverName2}${NC}"
[ -f ${service_config_dir}/${clientName2}adapter.service ] && [ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${csudo}systemctl start ${clientName2}adapter ${NC}"
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${csudo}systemctl start ${clientName2}adapter ${NC}"
elif ((${service_mod} == 1)); then
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${csudo}service ${serverName2} start${NC}"
[ -f ${service_config_dir}/${clientName2}adapter.service ] && [ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${csudo}service ${clientName2}adapter start${NC}"
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${csudo}service ${clientName2}adapter start${NC}"
else
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ./${serverName2}${NC}"
[ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${clientName2}adapter &${NC}"
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${clientName2}adapter &${NC}"
fi
echo -e "${GREEN_DARK}To enable ${clientName2}keeper ${NC}: sudo systemctl enable ${clientName2}keeper &${NC}"
if [ ${openresty_work} = 'true' ]; then
echo -e "${GREEN_DARK}To access ${productName2} ${NC}: use ${GREEN_UNDERLINE}${clientName2} -h $serverFqdn${NC} in shell OR from ${GREEN_UNDERLINE}http://127.0.0.1:${web_port}${NC}"
......@@ -934,6 +929,7 @@ function updateProduct() {
fi
echo
echo -e "\033[44;32;1m${productName2} is updated successfully!${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation and explorer features, you need to install ${clientName2}Explorer ${NC}"
else
install_bin
install_config
......@@ -971,8 +967,7 @@ function installProduct() {
if [ "$verMode" == "cluster" ]; then
install_connector
fi
install_examples
install_web
install_examples
if [ -z $1 ]; then # install service and client
# For installing new
......@@ -989,21 +984,23 @@ function installProduct() {
echo
echo -e "${GREEN_DARK}To configure ${productName2} ${NC}: edit ${cfg_install_dir}/${configFile2}"
[ -f ${configDir}/${clientName2}adapter.toml ] && [ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To configure ${clientName2} Adapter ${NC}: edit ${configDir}/${clientName2}adapter.toml"
echo -e "${GREEN_DARK}To configure ${clientName2}Adapter ${NC}: edit ${configDir}/${clientName2}adapter.toml"
if ((${service_mod} == 0)); then
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${csudo}systemctl start ${serverName2}${NC}"
[ -f ${service_config_dir}/${clientName2}adapter.service ] && [ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${csudo}systemctl start ${clientName2}adapter ${NC}"
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${csudo}systemctl start ${clientName2}adapter ${NC}"
elif ((${service_mod} == 1)); then
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${csudo}service ${serverName2} start${NC}"
[ -f ${service_config_dir}/${clientName2}adapter.service ] && [ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${csudo}service ${clientName2}adapter start${NC}"
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${csudo}service ${clientName2}adapter start${NC}"
else
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${serverName2}${NC}"
[ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${clientName2}adapter &${NC}"
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${clientName2}adapter &${NC}"
fi
echo -e "${GREEN_DARK}To enable ${clientName2}keeper ${NC}: sudo systemctl enable ${clientName2}keeper &${NC}"
if [ ! -z "$firstEp" ]; then
tmpFqdn=${firstEp%%:*}
substr=":"
......@@ -1025,6 +1022,7 @@ function installProduct() {
fi
echo -e "\033[44;32;1m${productName2} is installed successfully!${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation and explorer features, you need to install ${clientName2}Explorer ${NC}"
echo
else # Only install client
install_bin
......
......@@ -432,12 +432,6 @@ function install_examples() {
${csudo}cp -rf ${source_dir}/examples/* ${install_main_dir}/examples || :
}
function install_web() {
if [ -d "${binary_dir}/build/share" ]; then
${csudo}cp -rf ${binary_dir}/build/share/* ${install_main_dir}/share || :
fi
}
function clean_service_on_sysvinit() {
if ps aux | grep -v grep | grep ${serverName} &>/dev/null; then
${csudo}service ${serverName} stop || :
......@@ -592,7 +586,6 @@ function update_TDengine() {
install_lib
# install_connector
install_examples
install_web
install_bin
install_app
......
......@@ -126,7 +126,6 @@ else
fi
install_files="${script_dir}/install.sh"
web_dir="${top_dir}/../enterprise/src/plugins/web"
init_file_deb=${script_dir}/../deb/taosd
init_file_rpm=${script_dir}/../rpm/taosd
......@@ -320,17 +319,6 @@ if [[ $dbName == "taos" ]]; then
mkdir -p ${install_dir}/examples/taosbenchmark-json && cp ${examples_dir}/../tools/taos-tools/example/* ${install_dir}/examples/taosbenchmark-json
fi
# Add web files
if [ "$verMode" == "cluster" ] || [ "$verMode" == "cloud" ]; then
if [ -d "${web_dir}/admin" ] ; then
mkdir -p ${install_dir}/share/
cp -Rfap ${web_dir}/admin ${install_dir}/share/
cp ${web_dir}/png/taos.png ${install_dir}/share/admin/images/taos.png
cp -rf ${build_dir}/share/{etc,srv} ${install_dir}/share ||:
else
echo "directory not found for enterprise release: ${web_dir}/admin"
fi
fi
fi
# Copy driver
......
......@@ -1297,13 +1297,19 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
return -1;
}
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return terrno;
}
mgmtEpSet->numOfEps++;
uint32_t addr = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn);
if (addr == 0xffffffff) {
tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
} else {
mgmtEpSet->numOfEps++;
}
}
if (secondEp && secondEp[0] != 0) {
......@@ -1313,12 +1319,19 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
}
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++;
uint32_t addr = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn);
if (addr == 0xffffffff) {
tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
} else {
mgmtEpSet->numOfEps++;
}
}
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
return TSDB_CODE_RPC_NETWORK_UNAVAIL;
}
return 0;
......
......@@ -99,13 +99,20 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
goto End;
}
int updateEpSet = 1;
if (connectRsp.dnodeNum == 1) {
SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SEpSet dstEpSet = connectRsp.epSet;
rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
dstEpSet.eps[dstEpSet.inUse].fqdn);
} else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
if (srcEpSet.numOfEps == 1) {
rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
dstEpSet.eps[dstEpSet.inUse].fqdn);
updateEpSet = 0;
}
}
if (updateEpSet == 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SEpSet* pOrig = &corEpSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
......
......@@ -523,9 +523,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
return TSDB_CODE_SUCCESS;
return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
}
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
......@@ -546,7 +544,6 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return NULL;
}
......@@ -715,7 +712,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
end:
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
if(pCommitFp != NULL) {
pCommitFp(tmq, code, userParam);
}
return;
}
......@@ -2307,6 +2306,9 @@ const char* tmq_get_table_name(TAOS_RES* res) {
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
if (tmq == NULL) {
tscError("invalid tmq handle, null");
if(cb != NULL) {
cb(tmq, TSDB_CODE_INVALID_PARA, param);
}
return;
}
if (pRes == NULL) { // here needs to commit all offsets.
......@@ -2410,15 +2412,17 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
tscInfo("consumer:0x%" PRIx64 " sync send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
}
int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){
int32_t code = 0;
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_INVALID_PARA;
goto end;
}
int32_t accId = tmq->pTscObj->acctId;
......@@ -2427,17 +2431,17 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
int32_t code = getClientVg(tmq, tname, vgId, &pVg);
code = getClientVg(tmq, tname, vgId, &pVg);
if(code != 0){
taosWUnLockLatch(&tmq->lock);
return code;
goto end;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
code = checkWalRange(pOffsetInfo, offset);
if (code != 0) {
taosWUnLockLatch(&tmq->lock);
return code;
goto end;
}
taosWUnLockLatch(&tmq->lock);
......@@ -2445,9 +2449,12 @@ int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
tscInfo("consumer:0x%" PRIx64 " async send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
return code;
end:
if(code != 0 && cb != NULL){
cb(tmq, code, param);
}
}
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
......@@ -2832,6 +2839,7 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){
tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type);
}
tscInfo("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position);
return position;
}
......@@ -2871,12 +2879,16 @@ int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){
if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){
committed = pOffsetInfo->committedOffset.version;
taosWUnLockLatch(&tmq->lock);
return committed;
goto end;
}
SEpSet epSet = pVg->epSet;
taosWUnLockLatch(&tmq->lock);
return getCommittedFromServer(tmq, tname, vgId, &epSet);
committed = getCommittedFromServer(tmq, tname, vgId, &epSet);
end:
tscInfo("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed);
return committed;
}
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
......@@ -2897,7 +2909,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
code = TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_TMQ_INVALID_TOPIC;
goto end;
}
......@@ -3040,7 +3052,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%"PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset);
pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end;
......@@ -3078,6 +3090,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
return 0;
}
// seek interface have to send msg to server to cancel push handle if needed, because consumer may be in wait status if there is no data to poll
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
if (tmq == NULL || pTopicName == NULL) {
tscError("invalid tmq handle, null");
......@@ -3163,8 +3176,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0;
tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64,
tmq->consumerId, tname, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pParam->sem);
......
......@@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
bool mndRebTryStart() {
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mDebug("tq timer, rebalance counter old val:%d", old);
mInfo("tq timer, rebalance counter old val:%d", old);
return old == 0;
}
......@@ -116,7 +116,7 @@ void mndRebCntDec() {
int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) {
mDebug("rebalance trans end, rebalance counter:%d", newVal);
mInfo("rebalance trans end, rebalance counter:%d", newVal);
break;
}
}
......@@ -281,7 +281,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
// rebalance cannot be parallel
if (!mndRebTryStart()) {
mDebug("mq rebalance already in progress, do nothing");
mInfo("mq rebalance already in progress, do nothing");
return 0;
}
......@@ -312,7 +312,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
hbStatus);
......@@ -362,7 +362,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
}
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
mInfo("mq rebalance will be triggered");
mInfo("mq rebalance will be triggered");
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_DO_REBALANCE,
.pCont = pRebMsg,
......@@ -416,7 +416,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
for(int i = 0; i < taosArrayGetSize(req.topics); i++){
TopicOffsetRows* data = taosArrayGet(req.topics, i);
mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){
......@@ -1114,13 +1114,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
}
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
sdbRelease(pSdb, pConsumer);
continue;
}
taosRLockLatch(&pConsumer->lock);
mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId);
mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
bool hasTopic = true;
......
......@@ -1207,7 +1207,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
int32_t numOfRows = 0;
SMqSubscribeObj *pSub = NULL;
mDebug("mnd show subscriptions begin");
mInfo("mnd show subscriptions begin");
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
......@@ -1247,7 +1247,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
sdbRelease(pSdb, pSub);
}
mDebug("mnd end show subscriptions");
mInfo("mnd end show subscriptions");
pShow->numOfRows += numOfRows;
return numOfRows;
......
......@@ -703,7 +703,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
int32_t code = 0;
taosWLockLatch(&pTq->lock);
......@@ -784,7 +784,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
return -1;
}
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = NULL;
......
......@@ -547,7 +547,7 @@ void monSendReport() {
monGenGrantJson(pMonitor);
monGenDnodeJson(pMonitor);
monGenDiskJson(pMonitor);
monGenLogJson(pMonitor);
//monGenLogJson(pMonitor); // TS-3691
char *pCont = tjsonToString(pMonitor->pJson);
// uDebugL("report cont:%s\n", pCont);
......
......@@ -70,17 +70,18 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t fetchVer = pReader->curVersion;
int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
// int64_t appliedVer = walGetAppliedVer(pReader->pWal);
if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
}
// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
// }
int64_t endVer = TMIN(appliedVer, committedVer);
// int64_t endVer = TMIN(appliedVer, committedVer);
int64_t endVer = committedVer;
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer);
if (fetchVer > endVer){
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
......@@ -370,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
pRead->pWal->vers.appliedVer);
// TODO: valid ver
if (ver > pRead->pWal->vers.appliedVer) {
return -1;
}
// if (ver > pRead->pWal->vers.appliedVer) {
// return -1;
// }
if (pRead->curVersion != ver) {
code = walReaderSeekVer(pRead, ver);
......
......@@ -186,33 +186,6 @@ class RequestHandlerImpl(http.server.BaseHTTPRequestHandler):
tdLog.exit("total is null!")
# log_infos ====================================
if "log_infos" not in infoDict or infoDict["log_infos"]== None:
tdLog.exit("log_infos is null!")
if "logs" not in infoDict["log_infos"] or len(infoDict["log_infos"]["logs"]) < 8:#!= 10:
tdLog.exit("logs is null!")
if "ts" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 10:
tdLog.exit("ts is null!")
if "level" not in infoDict["log_infos"]["logs"][0] or infoDict["log_infos"]["logs"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]:
tdLog.exit("level is null!")
if "content" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 1:
tdLog.exit("content is null!")
if "summary" not in infoDict["log_infos"] or len(infoDict["log_infos"]["summary"])!= 4:
tdLog.exit("summary is null!")
if "total" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["total"] < 0 :
tdLog.exit("total is null!")
if "level" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]:
tdLog.exit("level is null!")
def do_GET(self):
"""
process GET request
......@@ -315,4 +288,3 @@ class TDTestCase:
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册