提交 ecc043a3 编写于 作者: wmmhello's avatar wmmhello

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/TD-14761

......@@ -3,7 +3,7 @@ title: Introduction
toc_max_heading_level: 2
---
TDengine is an open source, high-performance, cloud native [time-series database](https://tdengine.com/tsdb/) optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. Its code, including its cluster feature is open source under GNU AGPL v3.0. Besides the database engine, it provides [caching](../develop/cache), [stream processing](../develop/stream), [data subscription](../develop/tmq) and other functionalities to reduce the system complexity and cost of development and operation.
TDengine is an open source, high-performance, cloud native [time-series database](https://tdengine.com/tsdb/) optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. Its code, including its cluster feature is open source under GNU AGPL v3.0. Besides the database engine, it provides [caching](/develop/cache), [stream processing](/develop/stream), [data subscription](/develop/tmq) and other functionalities to reduce the system complexity and cost of development and operation.
This section introduces the major features, competitive advantages, typical use-cases and benchmarks to help you get a high level overview of TDengine.
......@@ -11,21 +11,33 @@ This section introduces the major features, competitive advantages, typical use-
The major features are listed below:
1. While TDengine supports [using SQL to insert](/develop/insert-data/sql-writing), it also supports [Schemaless writing](/reference/schemaless/) just like NoSQL databases. TDengine also supports standard protocols like [InfluxDB LINE](/develop/insert-data/influxdb-line)[OpenTSDB Telnet](/develop/insert-data/opentsdb-telnet), [OpenTSDB JSON ](/develop/insert-data/opentsdb-json) among others.
2. TDengine supports seamless integration with third-party data collection agents like [Telegraf](/third-party/telegraf)[Prometheus](/third-party/prometheus)[StatsD](/third-party/statsd)[collectd](/third-party/collectd)[icinga2](/third-party/icinga2), [TCollector](/third-party/tcollector), [EMQX](/third-party/emq-broker), [HiveMQ](/third-party/hive-mq-broker). These agents can write data into TDengine with simple configuration and without a single line of code.
3. Support for [all kinds of queries](/develop/query-data), including aggregation, nested query, downsampling, interpolation and others.
4. Support for [user defined functions](/develop/udf).
5. Support for [caching](/develop/cache). TDengine always saves the last data point in cache, so Redis is not needed in some scenarios.
6. Support for [continuous query](../develop/stream).
7. Support for [data subscription](../develop/tmq) with the capability to specify filter conditions.
8. Support for [cluster](../deployment/), with the capability of increasing processing power by adding more nodes. High availability is supported by replication.
9. Provides an interactive [command-line interface](/reference/taos-shell) for management, maintenance and ad-hoc queries.
10. Provides many ways to [import](/operation/import) and [export](/operation/export) data.
11. Provides [monitoring](/operation/monitor) on running instances of TDengine.
12. Provides [connectors](/reference/connector/) for [C/C++](/reference/connector/cpp), [Java](/reference/connector/java), [Python](/reference/connector/python), [Go](/reference/connector/go), [Rust](/reference/connector/rust), [Node.js](/reference/connector/node) and other programming languages.
13. Provides a [REST API](/reference/rest-api/).
14. Supports seamless integration with [Grafana](/third-party/grafana) for visualization.
15. Supports seamless integration with Google Data Studio.
1. Insert data
* supports [using SQL to insert](/develop/insert-data/sql-writing).
* supports [schemaless writing](/reference/schemaless/) just like NoSQL databases. It also supports standard protocols like [InfluxDB LINE](/develop/insert-data/influxdb-line)[OpenTSDB Telnet](/develop/insert-data/opentsdb-telnet), [OpenTSDB JSON ](/develop/insert-data/opentsdb-json) among others.
* supports seamless integration with third-party tools like [Telegraf](/third-party/telegraf/), [Prometheus](/third-party/prometheus/), [collectd](/third-party/collectd/), [StatsD](/third-party/statsd/), [TCollector](/third-party/tcollector/) and [icinga2/](/third-party/icinga2/), they can write data into TDengine with simple configuration and without a single line of code.
2. Query data
* supports standard [SQL](/taos-sql/), including nested query.
* supports [time series specific functions](/taos-sql/function/#time-series-extensions) and [time series specific queries](/taos-sql/distinguished), like downsampling, interpolation, cumulated sum, time weighted average, state window, session window and many others.
* supports [user defined functions](/taos-sql/udf).
3. [Caching](/develop/cache/): TDengine always saves the last data point in cache, so Redis is not needed for time-series data processing.
4. [Stream Processing](/develop/stream/): not only is the continuous query is supported, but TDengine also supports even driven stream processing, so Flink or spark is not needed for time-series daata processing.
5. [Data Dubscription](/develop/tmq/): application can subscribe a table or a set of tables. API is the same as Kafka, but you can specify filter conditions.
6. Visualization
* supports seamless integration with [Grafana](/third-party/grafana/) for visualization.
* supports seamless integration with Google Data Studio.
7. Cluster
* supports [cluster](/deployment/) with the capability of increasing processing power by adding more nodes.
* supports [deployment on Kubernetes](/deployment/k8s/)
* supports high availability via data replication.
8. Administration
* provides [monitoring](/operation/monitor) on running instances of TDengine.
* provides many ways to [import](/operation/import) and [export](/operation/export) data.
9. Tools
* provides an interactive [command-line interface](/reference/taos-shell) for management, maintenance and ad-hoc queries.
* provides a tool [taosBenchmark](/reference/taosbenchmark/) for testing the performance of TDengine.
10. Programming
* provides [connectors](/reference/connector/) for [C/C++](/reference/connector/cpp), [Java](/reference/connector/java), [Python](/reference/connector/python), [Go](/reference/connector/go), [Rust](/reference/connector/rust), [Node.js](/reference/connector/node) and other programming languages.
* provides a [REST API](/reference/rest-api/).
For more details on features, please read through the entire documentation.
......
......@@ -9,6 +9,7 @@ TDengine is a cloud-native time-series database that can be deployed on Kubernet
Before deploying TDengine on Kubernetes, perform the following:
* Current steps are compatible with Kubernetes v1.5 and later version.
* Install and configure minikube, kubectl, and helm.
* Install and deploy Kubernetes and ensure that it can be accessed and used normally. Update any container registries or other services as necessary.
......@@ -100,7 +101,7 @@ spec:
# Must set if you want a cluster.
- name: TAOS_FIRST_EP
value: "$(STS_NAME)-0.$(SERVICE_NAME).$(STS_NAMESPACE).svc.cluster.local:$(TAOS_SERVER_PORT)"
# TAOS_FQND should always be setted in k8s env.
# TAOS_FQDN should always be set in k8s env.
- name: TAOS_FQDN
value: "$(POD_NAME).$(SERVICE_NAME).$(STS_NAMESPACE).svc.cluster.local"
volumeMounts:
......
......@@ -1139,7 +1139,7 @@ SELECT STATECOUNT(field_name, oper, val) FROM { tb_name | stb_name } [WHERE clau
**Applicable parameter values**:
- oper : Can be one of `LT` (lower than), `GT` (greater than), `LE` (lower than or equal to), `GE` (greater than or equal to), `NE` (not equal to), `EQ` (equal to), the value is case insensitive
- oper : Can be one of `'LT'` (lower than), `'GT'` (greater than), `'LE'` (lower than or equal to), `'GE'` (greater than or equal to), `'NE'` (not equal to), `'EQ'` (equal to), the value is case insensitive, the value must be in quotes.
- val : Numeric types
**Return value type**: Integer
......@@ -1166,7 +1166,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W
**Applicable parameter values**:
- oper : Can be one of `LT` (lower than), `GT` (greater than), `LE` (lower than or equal to), `GE` (greater than or equal to), `NE` (not equal to), `EQ` (equal to), the value is case insensitive
- oper : Can be one of `'LT'` (lower than), `'GT'` (greater than), `'LE'` (lower than or equal to), `'GE'` (greater than or equal to), `'NE'` (not equal to), `'EQ'` (equal to), the value is case insensitive, the value must be in quotes.
- val : Numeric types
- unit: The unit of time interval. Enter one of the following options: 1b (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), or 1w (weeks) If you do not enter a unit of time, the precision of the current database is used by default.
......
......@@ -30,7 +30,7 @@ The following characters cannot occur in a password: single quotation marks ('),
- Maximum number of columns is 4096. There must be at least 2 columns, and the first column must be timestamp.
- The maximum length of a tag name is 64 bytes
- Maximum number of tags is 128. There must be at least 1 tag. The total length of tag values cannot exceed 16 KB.
- Maximum length of single SQL statement is 1 MB (1048576 bytes). It can be configured in the parameter `maxSQLLength` in the client side, the applicable range is [65480, 1048576].
- Maximum length of single SQL statement is 1 MB (1048576 bytes).
- At most 4096 columns can be returned by `SELECT`. Functions in the query statement constitute columns. An error is returned if the limit is exceeded.
- Maximum numbers of databases, STables, tables are dependent only on the system resources.
- The number of replicas can only be 1 or 3.
......
......@@ -10,6 +10,7 @@ description: 利用 Kubernetes 部署 TDengine 集群的详细指南
要使用 Kubernetes 部署管理 TDengine 集群,需要做好如下准备工作。
* 本文适用 Kubernetes v1.5 以上版本
* 本文和下一章使用 minikube、kubectl 和 helm 等工具进行安装部署,请提前安装好相应软件
* Kubernetes 已经安装部署并能正常访问使用或更新必要的容器仓库或其他服务
......
......@@ -1167,7 +1167,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W
**参数范围**
- oper : "LT" (小于)、"GT"(大于)、"LE"(小于等于)、"GE"(大于等于)、"NE"(不等于)、"EQ"(等于),不区分大小写
- oper : `'LT'` (小于)、`'GT'`(大于)、`'LE'`(小于等于)、`'GE'`(大于等于)、`'NE'`(不等于)、`'EQ'`(等于),不区分大小写,但需要用`''`包括
- val : 数值型
- unit : 时间长度的单位,可取值时间单位: 1b(纳秒), 1u(微秒),1a(毫秒),1s(秒),1m(分),1h(小时),1d(天), 1w(周)。如果省略,默认为当前数据库精度。
......
......@@ -31,7 +31,7 @@ description: 合法字符集和命名中的限制规则
- 最多允许 4096 列,最少需要 2 列,第一列必须是时间戳。
- 标签名最大长度为 64
- 最多允许 128 个,至少要有 1 个标签,一个表中标签值的总长度不超过 16KB
- SQL 语句最大长度 1048576 个字符,也可通过客户端配置参数 maxSQLLength 修改,取值范围 65480 ~ 1048576
- SQL 语句最大长度 1048576 个字符
- SELECT 语句的查询结果,最多允许返回 4096 列(语句中的函数调用可能也会占用一些列空间),超限时需要显式指定较少的返回数据列,以避免语句执行报错
- 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制
- 数据库的副本数只能设置为 1 或 3
......
......@@ -139,9 +139,8 @@ typedef struct SqlFunctionCtx {
struct SExprInfo *pExpr;
struct SDiskbasedBuf *pBuf;
struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
int32_t curBufPage;
bool increase;
bool isStream;
char udfName[TSDB_FUNC_NAME_LEN];
......
......@@ -256,8 +256,9 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentR
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData);
#ifdef __cplusplus
}
......
......@@ -67,10 +67,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pBuf
* @param groupId
* @return
*/
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId);
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf);
/**
* get the specified buffer page by id
......@@ -101,13 +100,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi);
*/
size_t getTotalBufSize(const SDiskbasedBuf* pBuf);
/**
* get the number of groups in the result buffer
* @param pBuf
* @return
*/
size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf);
/**
* destroy result buffer
* @param pBuf
......
def sync_source(branch_name) {
sh '''
hostname
env
echo ''' + branch_name + '''
'''
sh '''
cd ${TDINTERNAL_ROOT_DIR}
git reset --hard
git fetch || git fetch
git checkout ''' + branch_name + ''' -f
git branch
git pull || git pull
git log | head -n 20
cd ${TDENGINE_ROOT_DIR}
git reset --hard
git fetch || git fetch
git checkout ''' + branch_name + ''' -f
git branch
git pull || git pull
git log | head -n 20
git submodule update --init --recursive
'''
return 1
}
def run_test() {
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
'''
sh '''
export LD_LIBRARY_PATH=${TDINTERNAL_ROOT_DIR}/debug/build/lib
./fulltest.sh
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/tests
./test-all.sh b1fq
'''
}
def build_run() {
sync_source("${BRANCH_NAME}")
}
pipeline {
agent none
parameters {
string (
name:'version',
defaultValue:'3.0.0.1',
description: 'release version number,eg: 3.0.0.1 or 3.0.0.'
)
string (
name:'baseVersion',
defaultValue:'3.0.0.1',
description: 'This number of baseVerison is generally not modified.Now it is 3.0.0.1'
)
}
environment{
WORK_DIR = '/var/lib/jenkins/workspace'
TDINTERNAL_ROOT_DIR = '/var/lib/jenkins/workspace/TDinternal'
TDENGINE_ROOT_DIR = '/var/lib/jenkins/workspace/TDinternal/community'
BRANCH_NAME = '3.0'
TD_SERVER_TAR = "TDengine-server-${version}-Linux-x64.tar.gz"
BASE_TD_SERVER_TAR = "TDengine-server-${baseVersion}-arm64-x64.tar.gz"
TD_SERVER_ARM_TAR = "TDengine-server-${version}-Linux-arm64.tar.gz"
BASE_TD_SERVER_ARM_TAR = "TDengine-server-${baseVersion}-Linux-arm64.tar.gz"
TD_SERVER_LITE_TAR = "TDengine-server-${version}-Linux-x64-Lite.tar.gz"
BASE_TD_SERVER_LITE_TAR = "TDengine-server-${baseVersion}-Linux-x64-Lite.tar.gz"
TD_CLIENT_TAR = "TDengine-client-${version}-Linux-x64.tar.gz"
BASE_TD_CLIENT_TAR = "TDengine-client-${baseVersion}-arm64-x64.tar.gz"
TD_CLIENT_ARM_TAR = "TDengine-client-${version}-Linux-arm64.tar.gz"
BASE_TD_CLIENT_ARM_TAR = "TDengine-client-${baseVersion}-Linux-arm64.tar.gz"
TD_CLIENT_LITE_TAR = "TDengine-client-${version}-Linux-x64-Lite.tar.gz"
BASE_TD_CLIENT_LITE_TAR = "TDengine-client-${baseVersion}-Linux-x64-Lite.tar.gz"
TD_SERVER_RPM = "TDengine-server-${version}-Linux-x64.rpm"
TD_SERVER_DEB = "TDengine-server-${version}-Linux-x64.deb"
TD_SERVER_EXE = "TDengine-server-${version}-Windows-x64.exe"
TD_CLIENT_EXE = "TDengine-client-${version}-Windows-x64.exe"
}
stages {
stage ('RUN') {
stage('get check package scritps'){
agent{label 'ubuntu18'}
steps {
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
script{
sync_source("${BRANCH_NAME}")
}
}
}
}
parallel {
stage('ubuntu16') {
agent{label " ubuntu16 "}
steps {
timeout(time: 3, unit: 'MINUTES'){
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
rmtaos
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
}
}
}
stage('ubuntu18') {
agent{label " ubuntu18 "}
steps {
timeout(time: 3, unit: 'MINUTES'){
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
rmtaos
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_DEB} ${baseVersion} server
python3 checkPackageRuning.py
'''
}
}
}
stage('centos7') {
agent{label " centos7_9 "}
steps {
timeout(time: 240, unit: 'MINUTES'){
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
rmtaos
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
}
}
}
stage('centos8') {
agent{label " centos8_3 "}
steps {
timeout(time: 240, unit: 'MINUTES'){
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
rmtaos
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
'''
}
}
}
}
}
}
}
\ No newline at end of file
#!/usr/bin/python
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# install pip
# pip install src/connector/python/
# -*- coding: utf-8 -*-
import sys , os
import getopt
import subprocess
# from this import d
import time
# install taospy
out = subprocess.getoutput("pip3 show taospy|grep Version| awk -F ':' '{print $2}' ")
print(out)
if (out == "" ):
os.system("pip install git+https://github.com/taosdata/taos-connector-python.git")
print("install taos python connector")
# start taosd prepare
os.system("rm -rf /var/lib/taos/*")
os.system("systemctl restart taosd ")
# wait a moment ,at least 5 seconds
time.sleep(5)
# prepare data by taosBenchmark
os.system("taosBenchmark -y -n 100 -t 100")
import taos
conn = taos.connect(host="localhost",
user="root",
password="taosdata",
database="test",
port=6030,
config="/etc/taos", # for windows the default value is C:\TDengine\cfg
timezone="Asia/Shanghai") # default your host's timezone
server_version = conn.server_info
print("server_version", server_version)
client_version = conn.client_info
print("client_version", client_version) # 3.0.0.0
# Execute a sql and get its result set. It's useful for SELECT statement
result: taos.TaosResult = conn.query("SELECT count(*) from test.meters")
data = result.fetch_all()
if data[0][0] !=10000:
print(" taosBenchmark work not as expected ")
sys.exit(1)
else:
print(" taosBenchmark work as expected ")
# test taosdump dump out data and dump in data
# dump out datas
os.system("taosdump --version")
os.system("mkdir -p /tmp/dumpdata")
os.system("rm -rf /tmp/dumpdata/*")
# dump data out
print("taosdump dump out data")
os.system("taosdump -o /tmp/dumpdata -D test -y ")
# drop database of test
print("drop database test")
os.system(" taos -s ' drop database test ;' ")
# dump data in
print("taosdump dump data in")
os.system("taosdump -i /tmp/dumpdata -y ")
result = conn.query("SELECT count(*) from test.meters")
data = result.fetch_all()
if data[0][0] !=10000:
print(" taosdump work not as expected ")
sys.exit(1)
else:
print(" taosdump work as expected ")
conn.close()
\ No newline at end of file
#!/bin/sh
# function installPkgAndCheckFile{
echo "Download package"
packgeName=$1
version=$2
originPackageName=$3
originversion=$4
testFile=$5
subFile="taos.tar.gz"
if [ ${testFile} = "server" ];then
tdPath="TDengine-server-${version}"
originTdpPath="TDengine-server-${originversion}"
installCmd="install.sh"
elif [ ${testFile} = "client" ];then
tdPath="TDengine-client-${version}"
originTdpPath="TDengine-client-${originversion}"
installCmd="install_client.sh"
elif [ ${testFile} = "tools" ];then
tdPath="taosTools-${version}"
originTdpPath="taosTools-${originversion}"
installCmd="install-taostools.sh"
fi
echo "Uninstall all components of TDeingne"
if command -v rmtaos ;then
echo "uninstall all components of TDeingne:rmtaos"
echo " "
else
echo "os doesn't include TDengine "
fi
if command -v rmtaostools ;then
echo "uninstall all components of TDeingne:rmtaostools"
echo " "
else
echo "os doesn't include rmtaostools "
fi
echo "new workroom path"
installPath="/usr/local/src/packageTest"
oriInstallPath="/usr/local/src/packageTest/3.1"
if [ ! -d ${installPath} ] ;then
mkdir -p ${installPath}
else
echo "${installPath} already exists"
fi
if [ ! -d ${oriInstallPath} ] ;then
mkdir -p ${oriInstallPath}
else
echo "${oriInstallPath} already exists"
fi
echo "decompress installPackage"
cd ${installPath}
wget https://www.taosdata.com/assets-download/3.0/${packgeName}
cd ${oriInstallPath}
wget https://www.taosdata.com/assets-download/3.0/${originPackageName}
if [[ ${packgeName} =~ "deb" ]];then
echo "dpkg ${packgeName}" && dpkg -i ${packgeName}
elif [[ ${packgeName} =~ "rpm" ]];then
echo "rpm ${packgeName}" && rpm -ivh ${packgeName}
elif [[ ${packgeName} =~ "tar" ]];then
echo "tar ${packgeName}" && tar -xvf ${packgeName}
cd ${oriInstallPath}
echo "tar -xvf ${originPackageName}" && tar -xvf ${originPackageName}
cd ${installPath}
echo "tar -xvf ${packgeName}" && tar -xvf ${packgeName}
if [ ${testFile} != "tools" ] ;then
cd ${installPath}/${tdPath} && tar vxf ${subFile}
cd ${oriInstallPath}/${originTdpPath} && tar vxf ${subFile}
fi
echo "check installPackage File"
cd ${installPath}
tree ${oriInstallPath}/${originTdpPath} > ${originPackageName}_checkfile
tree ${installPath}/${tdPath} > ${packgeName}_checkfile
diff ${packgeName}_checkfile ${originPackageName}_checkfile > ${installPath}/diffFile.log
diffNumbers=`cat ${installPath}/diffFile.log |wc -l `
if [ ${diffNumbers} != 0 ];then
echo "The number and names of files have changed from the previous installation package"
echo `cat ${installPath}/diffFile.log`
exit -1
fi
cd ${installPath}/${tdPath}
if [ ${testFile} = "server" ];then
bash ${installCmd} -e no
else
bash ${installCmd}
fi
fi
# }
# installPkgAndCheckFile
......@@ -1707,8 +1707,8 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) {
SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock));
taosArrayPush(dataBlocks, pBlock);
SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*));
taosArrayPush(dataBlocks, &pBlock);
blockDebugShowDataBlocks(dataBlocks, flag);
taosArrayDestroy(dataBlocks);
}
......
......@@ -89,14 +89,14 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
if (pTrans == NULL) {
mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
return -1;
}
mDebug("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
......@@ -365,7 +365,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
......@@ -392,7 +392,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
mDebug("mnode:%d, start to create", createReq.dnodeId);
mInfo("mnode:%d, start to create", createReq.dnodeId);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
goto _OVER;
}
......@@ -574,7 +574,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......@@ -597,7 +597,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
mDebug("mnode:%d, start to drop", dropReq.dnodeId);
mInfo("mnode:%d, start to drop", dropReq.dnodeId);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
goto _OVER;
}
......@@ -732,7 +732,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
}
}
mTrace("trans:-1, sync reconfig will be proposed");
mInfo("trans:-1, sync reconfig will be proposed");
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->standby = 0;
......
......@@ -50,7 +50,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = cbMeta.code;
mDebug("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p",
transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state),
pRaw);
......@@ -68,7 +68,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
} else {
mDebug("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode));
mInfo("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode));
}
pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock);
......@@ -88,7 +88,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
}
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
mDebug("start to read snapshot from sdb in atomic way");
mInfo("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
&pSnapshot->lastConfigIndex);
......@@ -118,7 +118,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = cbMeta.code;
mDebug("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
cbMeta.code, cbMeta.index, cbMeta.term);
taosWLockLatch(&pMgmt->lock);
......@@ -126,7 +126,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
if (pMgmt->errCode != 0) {
mError("trans:-1, failed to propose sync reconfig since %s, post sem", tstrerror(pMgmt->errCode));
} else {
mDebug("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem",
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem",
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term);
}
pMgmt->transId = 0;
......@@ -136,13 +136,13 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
}
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
mDebug("start to read snapshot from sdb");
mInfo("start to read snapshot from sdb");
SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
}
int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
mDebug("stop to read snapshot from sdb");
mInfo("stop to read snapshot from sdb");
SMnode *pMnode = pFsm->data;
return sdbStopRead(pMnode->pSdb, pReader);
}
......@@ -174,12 +174,12 @@ int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int
void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SMnode *pMnode = pFsm->data;
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 1);
mDebug("vgId:1, mnode leader transfer finish");
mInfo("vgId:1, mnode leader transfer finish");
}
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
mDebug("vgId:1, become follower and post sem");
mInfo("vgId:1, become follower and post sem");
taosWLockLatch(&pMnode->syncMgmt.lock);
if (pMnode->syncMgmt.transId != 0) {
......@@ -190,7 +190,7 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
}
static void mndBecomeLeader(struct SSyncFSM *pFsm) {
mDebug("vgId:1, become leader");
mInfo("vgId:1, become leader");
SMnode *pMnode = pFsm->data;
}
......@@ -228,7 +228,7 @@ int32_t mndInitSync(SMnode *pMnode) {
syncInfo.isStandBy = pMgmt->standby;
syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT;
mDebug("start to open mnode sync, standby:%d", pMgmt->standby);
mInfo("start to open mnode sync, standby:%d", pMgmt->standby);
if (pMgmt->standby || pMgmt->replica.id > 0) {
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = 1;
......@@ -236,7 +236,7 @@ int32_t mndInitSync(SMnode *pMnode) {
SNodeInfo *pNode = &pCfg->nodeInfo[0];
tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pMgmt->replica.port;
mDebug("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort);
mInfo("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort);
}
tsem_init(&pMgmt->syncSem, 0, 0);
......@@ -284,7 +284,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
return -1;
} else {
pMgmt->transId = transId;
mDebug("trans:%d, will be proposed", pMgmt->transId);
mInfo("trans:%d, will be proposed", pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock);
}
......@@ -314,7 +314,7 @@ void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync);
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
mInfo("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
}
void mndSyncStop(SMnode *pMnode) {
......
......@@ -456,11 +456,11 @@ static const char *mndTransStr(ETrnStage stage) {
}
static void mndTransTestStartFunc(SMnode *pMnode, void *param, int32_t paramLen) {
mDebug("test trans start, param:%s, len:%d", (char *)param, paramLen);
mInfo("test trans start, param:%s, len:%d", (char *)param, paramLen);
}
static void mndTransTestStopFunc(SMnode *pMnode, void *param, int32_t paramLen) {
mDebug("test trans stop, param:%s, len:%d", (char *)param, paramLen);
mInfo("test trans stop, param:%s, len:%d", (char *)param, paramLen);
}
static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
......@@ -707,7 +707,7 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
if (pTrans->oper == oper) {
if (strcasecmp(dbname, pTrans->dbname1) == 0) {
mDebug("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
if (pTrans->pRpcArray == NULL) {
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
}
......@@ -746,7 +746,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage));
mInfo("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage));
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
......@@ -755,7 +755,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
sdbFreeRaw(pRaw);
mDebug("trans:%d, sync finished", pTrans->id);
mInfo("trans:%d, sync finished", pTrans->id);
return 0;
}
......@@ -821,12 +821,12 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1;
}
mDebug("trans:%d, prepare transaction", pTrans->id);
mInfo("trans:%d, prepare transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
}
mDebug("trans:%d, prepare finished", pTrans->id);
mInfo("trans:%d, prepare finished", pTrans->id);
STrans *pNew = mndAcquireTrans(pMnode, pTrans->id);
if (pNew == NULL) {
......@@ -847,22 +847,22 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
}
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, commit transaction", pTrans->id);
mInfo("trans:%d, commit transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
return -1;
}
mDebug("trans:%d, commit finished", pTrans->id);
mInfo("trans:%d, commit finished", pTrans->id);
return 0;
}
static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, rollback transaction", pTrans->id);
mInfo("trans:%d, rollback transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr());
return -1;
}
mDebug("trans:%d, rollback finished", pTrans->id);
mInfo("trans:%d, rollback finished", pTrans->id);
return 0;
}
......@@ -894,7 +894,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
for (int32_t i = 0; i < size; ++i) {
SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i);
if (pInfo->handle != NULL) {
mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
mInfo("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
pInfo->ahandle);
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
......@@ -902,13 +902,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
SRpcMsg rspMsg = {.code = code, .info = *pInfo};
if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
mDebug("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
mInfo("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1);
if (pDb != NULL) {
for (int32_t j = 0; j < 12; j++) {
bool ready = mndIsDbReady(pMnode, pDb);
if (!ready) {
mDebug("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname1, j);
mInfo("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname1, j);
taosMsleep(1000);
} else {
break;
......@@ -978,7 +978,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
pAction->errCode = pRsp->code;
}
mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId,
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId,
mndTransStr(pAction->stage), action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
mndTransExecute(pMnode, pTrans);
......@@ -994,10 +994,10 @@ static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pA
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR ||
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
mInfo("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
pAction->id, pAction->epSet.inUse);
} else {
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), pAction->id);
mInfo("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), pAction->id);
}
pAction->errCode = 0;
}
......@@ -1024,7 +1024,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
pAction->rawWritten = true;
pAction->errCode = 0;
code = 0;
mDebug("trans:%d, %s:%d write to sdb, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
mInfo("trans:%d, %s:%d write to sdb, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
pTrans->lastAction = pAction->id;
......@@ -1073,7 +1073,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
pAction->msgSent = 1;
pAction->msgReceived = 0;
pAction->errCode = 0;
mDebug("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);
mInfo("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
......@@ -1100,7 +1100,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
pAction->rawWritten = 0;
pAction->errCode = 0;
mDebug("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
......@@ -1160,7 +1160,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
pTrans->lastMsgType = 0;
memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset));
pTrans->lastErrorNo = 0;
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
mInfo("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
return 0;
} else {
mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode & 0XFFFF);
......@@ -1175,7 +1175,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return errCode;
}
} else {
mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfExecuted, numOfActions);
mInfo("trans:%d, %d of %d actions executed", pTrans->id, numOfExecuted, numOfActions);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
}
......@@ -1221,7 +1221,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
code = pAction->errCode;
mndTransResetAction(pMnode, pTrans, pAction);
} else {
mDebug("trans:%d, %s:%d execute successfully", pTrans->id, mndTransStr(pAction->stage), action);
mInfo("trans:%d, %s:%d execute successfully", pTrans->id, mndTransStr(pAction->stage), action);
}
} else {
code = TSDB_CODE_ACTION_IN_PROGRESS;
......@@ -1230,7 +1230,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
code = pAction->errCode;
} else {
mDebug("trans:%d, %s:%d write successfully", pTrans->id, mndTransStr(pAction->stage), action);
mInfo("trans:%d, %s:%d write successfully", pTrans->id, mndTransStr(pAction->stage), action);
}
} else {
}
......@@ -1254,7 +1254,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
if (code == 0) {
pTrans->code = 0;
pTrans->redoActionPos++;
mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
pAction->id);
code = mndTransSync(pMnode, pTrans);
if (code != 0) {
......@@ -1263,17 +1263,17 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
mndTransStr(pAction->stage), pAction->id, terrstr());
}
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
mInfo("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
break;
} else if (code == pAction->retryCode) {
mDebug("trans:%d, %s:%d receive code:0x%x and retry", pTrans->id, mndTransStr(pAction->stage), pAction->id, code);
mInfo("trans:%d, %s:%d receive code:0x%x and retry", pTrans->id, mndTransStr(pAction->stage), pAction->id, code);
taosMsleep(300);
action--;
continue;
} else {
terrno = code;
pTrans->code = code;
mDebug("trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d", pTrans->id,
mInfo("trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d", pTrans->id,
mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes);
break;
}
......@@ -1285,7 +1285,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
pTrans->stage = TRN_STAGE_REDO_ACTION;
mDebug("trans:%d, stage from prepare to redoAction", pTrans->id);
mInfo("trans:%d, stage from prepare to redoAction", pTrans->id);
return continueExec;
}
......@@ -1304,10 +1304,10 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_COMMIT;
mDebug("trans:%d, stage from redoAction to commit", pTrans->id);
mInfo("trans:%d, stage from redoAction to commit", pTrans->id);
continueExec = true;
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
mInfo("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
continueExec = false;
} else {
pTrans->failedTimes++;
......@@ -1347,7 +1347,7 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_COMMIT_ACTION;
mDebug("trans:%d, stage from commit to commitAction", pTrans->id);
mInfo("trans:%d, stage from commit to commitAction", pTrans->id);
continueExec = true;
} else {
pTrans->code = terrno;
......@@ -1366,7 +1366,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_FINISHED;
mDebug("trans:%d, stage from commitAction to finished", pTrans->id);
mInfo("trans:%d, stage from commitAction to finished", pTrans->id);
continueExec = true;
} else {
pTrans->code = terrno;
......@@ -1384,10 +1384,10 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->stage = TRN_STAGE_FINISHED;
mDebug("trans:%d, stage from undoAction to finished", pTrans->id);
mInfo("trans:%d, stage from undoAction to finished", pTrans->id);
continueExec = true;
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
mInfo("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
continueExec = false;
} else {
pTrans->failedTimes++;
......@@ -1406,7 +1406,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->stage = TRN_STAGE_UNDO_ACTION;
mDebug("trans:%d, stage from rollback to undoAction", pTrans->id);
mInfo("trans:%d, stage from rollback to undoAction", pTrans->id);
continueExec = true;
} else {
pTrans->failedTimes++;
......@@ -1431,7 +1431,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
}
mDebug("trans:%d, execute finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
mInfo("trans:%d, execute finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
return continueExec;
}
......@@ -1439,7 +1439,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
while (continueExec) {
mDebug("trans:%d, continue to execute, stage:%s", pTrans->id, mndTransStr(pTrans->stage));
mInfo("trans:%d, continue to execute, stage:%s", pTrans->id, mndTransStr(pTrans->stage));
pTrans->lastExecTime = taosGetTimestampMs();
switch (pTrans->stage) {
case TRN_STAGE_PREPARE:
......
......@@ -128,8 +128,10 @@ typedef struct STsdbReader STsdbReader;
#define TIMEWINDOW_RANGE_CONTAINED 1
#define TIMEWINDOW_RANGE_EXTERNAL 2
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
#define CACHESCAN_RETRIEVE_TYPE_ALL 0x1
#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
......@@ -146,9 +148,9 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
int32_t tsdbLastrowReaderClose(void *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
int32_t tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
......
......@@ -18,7 +18,7 @@
#include "tcommon.h"
#include "tsdb.h"
typedef struct SLastrowReader {
typedef struct SCacheRowsReader {
SVnode* pVnode;
STSchema* pSchema;
uint64_t uid;
......@@ -27,9 +27,9 @@ typedef struct SLastrowReader {
int32_t type;
int32_t tableIndex; // currently returned result tables
SArray* pTableList; // table id list
} SLastrowReader;
} SCacheRowsReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
int32_t numOfRows = pBlock->info.rows;
......@@ -61,8 +61,10 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade
pBlock->info.rows += 1;
}
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader));
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
*pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -81,9 +83,17 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
p->pTableList = pTableIdList;
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
if (p->transferBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
if (p->transferBuf[i] == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
}
......@@ -91,8 +101,8 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
return TSDB_CODE_SUCCESS;
}
int32_t tsdbLastrowReaderClose(void* pReader) {
SLastrowReader* p = pReader;
int32_t tsdbCacherowsReaderClose(void* pReader) {
SCacheRowsReader* p = pReader;
if (p->pSchema != NULL) {
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
......@@ -107,28 +117,56 @@ int32_t tsdbLastrowReaderClose(void* pReader) {
return TSDB_CODE_SUCCESS;
}
int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow, LRUHandle** h) {
int32_t code = TSDB_CODE_SUCCESS;
if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) {
code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (*h != NULL) {
*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
}
} else {
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (*h != NULL) {
SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h);
tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema);
}
}
return code;
}
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SLastrowReader* pr = pReader;
SCacheRowsReader* pr = pReader;
int32_t code = TSDB_CODE_SUCCESS;
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL;
STSRow* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
// retrieve the only one last row of all tables in the uid list.
if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) {
if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) {
int64_t lastKey = INT64_MIN;
bool internalResult = false;
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) {
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -136,9 +174,6 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
continue;
}
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
......@@ -155,25 +190,18 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
tsdbCacheRelease(lruCache, h);
}
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
} else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) {
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (h == NULL) {
continue;
}
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
......
......@@ -16,9 +16,9 @@
#include "osDef.h"
#include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
#define DEFAULT_ROW_INDEX_VAL (-1)
#define INITIAL_ROW_INDEX_VAL (-1)
typedef enum {
EXTERNAL_ROWS_PREV = 0x1,
......@@ -234,7 +234,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
}
for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL};
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
info.lastKey = pTsdbReader->window.skey;
......@@ -266,7 +266,9 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) {
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
}
p->delSkyline = taosArrayDestroy(p->delSkyline);
p->fileDelIndex = -1;
p->delSkyline = taosArrayDestroy(p->delSkyline);
p->lastBlockDelIndex = INITIAL_ROW_INDEX_VAL;
}
}
......@@ -414,7 +416,7 @@ _err:
return false;
}
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
pIter->order = order;
pIter->index = -1;
pIter->numOfBlocks = 0;
......@@ -423,7 +425,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashOb
} else {
taosArrayClear(pIter->blockList);
}
pIter->pTableMap = pTableMap;
}
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
......@@ -579,7 +580,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
}
// reset the index in last block when handing a new file
px->indexInBlockL = DEFAULT_ROW_INDEX_VAL;
px->indexInBlockL = INITIAL_ROW_INDEX_VAL;
tMapDataClear(&px->mapData);
taosArrayClear(px->pBlockList);
}
......@@ -887,6 +888,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
pBlockIter->numOfBlocks = numOfBlocks;
taosArrayClear(pBlockIter->blockList);
pBlockIter->pTableMap = pReader->status.pTableMap;
// access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
......@@ -2403,7 +2405,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
int32_t index = pScanInfo->indexInBlockL;
if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
if (!hasData) { // current table does not have rows in last block, try next table
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
......@@ -2470,7 +2472,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// note: the lastblock may be null here
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
if (pScanInfo->indexInBlockL == INITIAL_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
}
}
......@@ -2582,7 +2584,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
} else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
resetDataBlockIterator(pBlockIter, pReader->order);
}
SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader;
......@@ -2654,7 +2656,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
initBlockDumpInfo(pReader, pBlockIter);
} else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
resetDataBlockIterator(pBlockIter, pReader->order);
goto _begin;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
......@@ -3276,7 +3278,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
ASSERT(pReader != NULL);
taosHashClear(pReader->status.pTableMap);
STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL};
STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL};
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
return TDB_CODE_SUCCESS;
}
......@@ -3372,7 +3374,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) {
......@@ -3393,7 +3395,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
}
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order);
// no data in files, let's try buffer in memory
if (pPrevReader->status.fileIter.numOfFiles == 0) {
......@@ -3696,7 +3698,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbDataFReaderClose(&pReader->pFileReader);
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
resetDataBlockScanInfo(pReader->status.pTableMap);
int32_t code = 0;
......
......@@ -22,6 +22,7 @@
#include "tbuffer.h"
#include "tcommon.h"
#include "tpagedbuf.h"
#include "tsimplehash.h"
#define T_LONG_JMP(_obj, _c) \
do { \
......@@ -106,7 +107,7 @@ static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPo
setBufPageDirty(pPage, true);
}
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
......
......@@ -299,10 +299,10 @@ enum {
};
typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SSHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef struct {
......@@ -924,7 +924,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
......
......@@ -28,7 +28,7 @@ typedef void (*_hash_free_fn_t)(void *);
/**
* @brief single thread hash
*
*
*/
typedef struct SSHashObj SSHashObj;
......@@ -52,13 +52,13 @@ int32_t tSimpleHashPrint(const SSHashObj *pHashObj);
/**
* @brief put element into hash table, if the element with the same key exists, update it
*
* @param pHashObj
* @param key
* @param keyLen
* @param data
* @param dataLen
* @return int32_t
*
* @param pHashObj
* @param key
* @param keyLen
* @param data
* @param dataLen
* @return int32_t
*/
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen);
......@@ -80,6 +80,18 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen);
*/
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen);
/**
* remove item with the specified key during hash iterate
*
* @param pHashObj
* @param key
* @param keyLen
* @param pIter
* @param iter
* @return int32_t
*/
int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter);
/**
* Clear the hash table.
* @param pHashObj
......@@ -99,13 +111,27 @@ void tSimpleHashCleanup(SSHashObj *pHashObj);
*/
size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
#pragma pack(push, 4)
typedef struct SHNode{
struct SHNode *next;
uint32_t keyLen : 20;
uint32_t dataLen : 12;
char data[];
} SHNode;
#pragma pack(pop)
/**
* Get the corresponding key information for a given data in hash table
* @param data
* @param keyLen
* @return
*/
void *tSimpleHashGetKey(void *data, size_t* keyLen);
static FORCE_INLINE void *tSimpleHashGetKey(void *data, size_t *keyLen) {
SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data));
if (keyLen) *keyLen = node->keyLen;
return POINTER_SHIFT(data, node->dataLen);
}
/**
* Create the hash table iterator
......@@ -116,17 +142,6 @@ void *tSimpleHashGetKey(void *data, size_t* keyLen);
*/
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter);
/**
* Create the hash table iterator
*
* @param pHashObj
* @param data
* @param key
* @param iter
* @return void*
*/
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter);
#ifdef __cplusplus
}
#endif
......
......@@ -25,24 +25,27 @@
#include "thash.h"
#include "ttypes.h"
static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator);
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
static void destroyLastrowScanOperator(void* param);
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pInfo->readHandle = *readHandle;
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, &numOfCols,
COL_MATCH_FROM_COL_ID);
int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -55,13 +58,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
// partition by tbname
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_ALL;
tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|CACHESCAN_RETRIEVE_LAST_ROW;
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
} else { // by tags
pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_SINGLE;
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|CACHESCAN_RETRIEVE_LAST_ROW;
}
if (pScanNode->scan.pScanPseudoCols != NULL) {
......@@ -80,19 +87,19 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
pOperator->cost.openCost = 0;
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
pTaskInfo->code = code;
destroyLastrowScanOperator(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -109,14 +116,14 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pRes);
// check if it is a group by tbname
if (pInfo->retrieveType == LASTROW_RETRIEVE_TYPE_ALL) {
if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) {
blockDataCleanup(pInfo->pBufferredRes);
taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
// check for tag values
......@@ -172,11 +179,11 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
while (pInfo->currentGroupIndex < totalGroups) {
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
......@@ -200,7 +207,7 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
}
}
tsdbLastrowReaderClose(pInfo->pLastrowReader);
tsdbCacherowsReaderClose(pInfo->pLastrowReader);
return pInfo->pRes;
}
}
......
......@@ -83,7 +83,7 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) {
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) {
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows);
}
......@@ -92,9 +92,10 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
void* pData = NULL;
pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES);
size_t keyLen = 0;
while ((pData = taosHashIterate(pHashmap, pData)) != NULL) {
void* key = taosHashGetKey(pData, &keyLen);
size_t keyLen = 0;
int32_t iter = 0;
while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pData, &keyLen);
SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition));
......@@ -1218,7 +1219,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pExpr->base.numOfParams;
pCtx->increase = false;
pCtx->isStream = false;
pCtx->param = pFunct->pParam;
......
......@@ -184,7 +184,7 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int
// in the first scan, new space needed for results
int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);
SIDList list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
......@@ -234,7 +234,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
SResultRow* pResult = NULL;
......@@ -273,7 +273,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
sizeof(SResultRowPosition));
}
......@@ -282,7 +282,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// too many time window in query
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
}
......@@ -299,7 +299,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
// in the first scan, new space needed for results
int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf, tid);
SIDList list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tid, &pageId);
......@@ -1565,16 +1565,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
if (pCtx[j].increase) {
int64_t ts = *(int64_t*)in;
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
ts++;
}
} else {
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
}
}
......@@ -3011,7 +3003,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
}
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t size = tSimpleHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize =
sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
......@@ -3038,10 +3030,11 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage);
void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
while (pIter) {
void* key = taosHashGetKey(pIter, &keyLen);
int32_t iter = 0;
void* pIter = NULL;
while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
void* key = tSimpleHashGetKey(pIter, &keyLen);
SResultRowPosition* p1 = (SResultRowPosition*)pIter;
pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
......@@ -3072,8 +3065,6 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
offset += sizeof(int32_t);
memcpy(*result + offset, pRow, pSup->resultRowSize);
offset += pSup->resultRowSize;
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
}
*(int32_t*)(*result) = offset;
......@@ -3108,7 +3099,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
// add a new result set for a new group
SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
offset += keyLen;
int32_t valueLen = *(int32_t*)(result + offset);
......@@ -3225,6 +3216,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
blockDataCleanup(pInfo->pRes);
doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
......@@ -3287,7 +3279,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
SSDataBlock* pResBlock = pInfo->pFinalRes;
blockDataCleanup(pResBlock);
blockDataCleanup(pInfo->pRes);
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
......@@ -3311,6 +3302,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
} else {
blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
blockDataCleanup(pInfo->pRes);
doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
......@@ -3353,7 +3346,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
assert(pBlock != NULL);
blockDataCleanup(pResBlock);
blockDataCleanup(pInfo->pRes);
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold) {
......@@ -3452,7 +3444,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -3479,7 +3471,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
void cleanupAggSup(SAggSupporter* pAggSup) {
taosMemoryFreeClear(pAggSup->keyBuf);
taosHashCleanup(pAggSup->pResultRowHashTable);
tSimpleHashCleanup(pAggSup->pResultRowHashTable);
destroyDiskbasedBuf(pAggSup->pResultBuf);
}
......@@ -4139,7 +4131,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
pOperator = createLastrowScanOperator(pScanNode, pHandle, pTaskInfo);
pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else {
......
......@@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
STableScanInfo* pTableScanInfo = pOperator->info;
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
if (p1 == NULL) {
return NULL;
......
......@@ -33,11 +33,16 @@ typedef struct SPullWindowInfo {
uint64_t groupId;
} SPullWindowInfo;
typedef struct SOpenWindowInfo {
SResultRowPosition pos;
uint64_t groupId;
} SOpenWindowInfo;
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
///*
......@@ -598,14 +603,14 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs;
uint64_t groupId = pBlock->info.groupId;
SResultRow* pResult = NULL;
while (1) {
SListNode* pn = tdListGetHead(pResultRowInfo->openWindow);
SResultRowPosition* p1 = (SResultRowPosition*)pn->data;
SOpenWindowInfo* pOpenWin = (SOpenWindowInfo *)pn->data;
uint64_t groupId = pOpenWin->groupId;
SResultRowPosition* p1 = &pOpenWin->pos;
if (p->pageId == p1->pageId && p->offset == p1->offset) {
break;
}
......@@ -631,12 +636,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
int64_t prevTs = *(int64_t*)pTsKey->pData;
doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
RESULT_ROW_END_INTERP, pSup);
if (groupId == pBlock->info.groupId) {
doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
RESULT_ROW_END_INTERP, pSup);
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows,
numOfExprs);
......@@ -965,7 +973,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// prev time window not interpolation yet.
if (pInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
......@@ -1017,10 +1025,18 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
// window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
//TODO: add to open window? how to close the open windows after input blocks exhausted?
#if 0
if ((ascScan && ekey <= pBlock->info.window.ekey) ||
(!ascScan && ekey >= pBlock->info.window.skey)) {
// window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
} else if (pInfo->timeWindowInterpo) {
addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
}
#endif
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
numOfOutput);
......@@ -1040,20 +1056,23 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf
}
}
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult) {
SResultRowPosition pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) {
SOpenWindowInfo openWin = {0};
openWin.pos.pageId = pResult->pageId;
openWin.pos.offset = pResult->offset;
openWin.groupId = groupId;
SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
if (pn == NULL) {
tdListAppend(pResultRowInfo->openWindow, &pos);
return pos;
tdListAppend(pResultRowInfo->openWindow, &openWin);
return openWin.pos;
}
SResultRowPosition* px = (SResultRowPosition*)pn->data;
if (px->pageId != pos.pageId || px->offset != pos.offset) {
tdListAppend(pResultRowInfo->openWindow, &pos);
SOpenWindowInfo * px = (SOpenWindowInfo *)pn->data;
if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
tdListAppend(pResultRowInfo->openWindow, &openWin);
}
return pos;
return openWin.pos;
}
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
......@@ -1380,7 +1399,7 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t
int32_t numOfOutput) {
SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
(SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (!p1) {
// window has been closed
return false;
......@@ -1393,14 +1412,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
size_t bytes = sizeof(TSKEY);
SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
(SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (!p1) {
// window has been closed
return false;
}
// SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId);
// dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage);
taosHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return true;
}
......@@ -1450,11 +1469,13 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
}
}
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
......@@ -1467,14 +1488,15 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
return TSDB_CODE_SUCCESS;
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages,
SDiskbasedBuf* pDiscBuf) {
qDebug("===stream===close interval window");
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
......@@ -1512,7 +1534,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
}
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen);
tSimpleHashIterateRemove(pHashMap, keyBuf, keyLen, &pIte, &iter);
}
}
return TSDB_CODE_SUCCESS;
......@@ -1808,7 +1830,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
void increaseTs(SqlFunctionCtx* pCtx) {
if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) {
pCtx[0].increase = true;
// pCtx[0].increase = true;
}
}
......@@ -1884,7 +1906,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo);
if (pInfo->timeWindowInterpo) {
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error;
}
......@@ -2855,7 +2877,7 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) {
int32_t bytes = sizeof(TSKEY);
SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return p1 != NULL;
}
......@@ -2896,7 +2918,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
return p1 == NULL;
}
......@@ -3025,7 +3047,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
}
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
taosHashClear(pInfo->aggSup.pResultRowHashTable);
tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
}
......@@ -4926,14 +4948,14 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
SExprSupp* pSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
return TSDB_CODE_SUCCESS;
}
......@@ -4956,7 +4978,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
// there is an result exists
if (miaInfo->curTs != INT64_MIN) {
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
if (ts != miaInfo->curTs) {
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
......@@ -4964,7 +4986,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
}
} else {
miaInfo->curTs = ts;
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
}
STimeWindow win = {0};
......@@ -5040,7 +5062,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) {
// close last unfinalized time window
if (miaInfo->curTs != INT64_MIN) {
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
miaInfo->curTs = INT64_MIN;
}
......@@ -5157,7 +5179,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo);
if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
}
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
......@@ -5221,12 +5243,12 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return TSDB_CODE_SUCCESS;
}
......@@ -5292,7 +5314,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
// prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
......@@ -5467,9 +5489,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initBasicInfo(&pIntervalInfo->binfo, pResBlock);
initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
if (pIntervalInfo->timeWindowInterpo) {
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error;
}
......
......@@ -31,21 +31,12 @@
taosMemoryFreeClear(_n); \
} while (0);
#pragma pack(push, 4)
typedef struct SHNode {
struct SHNode *next;
uint32_t keyLen : 20;
uint32_t dataLen : 12;
char data[];
} SHNode;
#pragma pack(pop)
struct SSHashObj {
SHNode **hashList;
size_t capacity; // number of slots
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function
};
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
......@@ -76,7 +67,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
pHashObj->hashFp = fn;
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
if (!pHashObj->hashList) {
taosMemoryFree(pHashObj);
......@@ -285,6 +275,44 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
return TSDB_CODE_SUCCESS;
}
int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter) {
if (!pHashObj || !key) {
return TSDB_CODE_FAILED;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot];
SHNode *pPrev = NULL;
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
if (!pPrev) {
pHashObj->hashList[slot] = pNode->next;
} else {
pPrev->next = pNode->next;
}
if (*pIter == (void *)GET_SHASH_NODE_DATA(pNode)) {
if (!pPrev) {
*pIter = NULL;
} else {
*pIter = GET_SHASH_NODE_DATA(pPrev);
}
}
FREE_HASH_NODE(pNode);
atomic_sub_fetch_64(&pHashObj->size, 1);
break;
}
pPrev = pNode;
pNode = pNode->next;
}
return TSDB_CODE_SUCCESS;
}
void tSimpleHashClear(SSHashObj *pHashObj) {
if (!pHashObj || taosHashTableEmpty(pHashObj)) {
return;
......@@ -302,6 +330,7 @@ void tSimpleHashClear(SSHashObj *pHashObj) {
FREE_HASH_NODE(pNode);
pNode = pNext;
}
pHashObj->hashList[i] = NULL;
}
atomic_store_64(&pHashObj->size, 0);
}
......@@ -324,15 +353,6 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj);
}
void *tSimpleHashGetKey(void *data, size_t *keyLen) {
SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data));
if (keyLen) {
*keyLen = node->keyLen;
}
return POINTER_SHIFT(data, node->dataLen);
}
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
if (!pHashObj) {
return NULL;
......@@ -341,7 +361,7 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
SHNode *pNode = NULL;
if (!data) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
for (int32_t i = *iter; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (!pNode) {
continue;
......@@ -368,52 +388,5 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
return GET_SHASH_NODE_DATA(pNode);
}
return NULL;
}
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter) {
if (!pHashObj) {
return NULL;
}
SHNode *pNode = NULL;
if (!data) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (!pNode) {
continue;
}
*iter = i;
if (key) {
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
}
return GET_SHASH_NODE_DATA(pNode);
}
return NULL;
}
pNode = (SHNode *)((char *)data - offsetof(SHNode, data));
if (pNode->next) {
if (key) {
*key = GET_SHASH_NODE_KEY(pNode->next, pNode->next->dataLen);
}
return GET_SHASH_NODE_DATA(pNode->next);
}
++(*iter);
for (int32_t i = *iter; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (!pNode) {
continue;
}
*iter = i;
if (key) {
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
}
return GET_SHASH_NODE_DATA(pNode);
}
return NULL;
}
\ No newline at end of file
......@@ -97,7 +97,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
return pSortHandle;
}
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
blockDataDestroy(pSource->src.pBlock);
......@@ -134,15 +134,14 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
return TSDB_CODE_SUCCESS;
}
static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) {
static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId, SArray* pPageIdList) {
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
pSource->pageIdList = getDataBufPagesIdList(pBuf, (*sourceId));
pSource->src.pBlock = pBlock;
pSource->pageIdList = pPageIdList;
taosArrayPush(pAllSources, &pSource);
(*sourceId) += 1;
......@@ -171,6 +170,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
}
}
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while(start < pDataBlock->info.rows) {
int32_t stop = 0;
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
......@@ -186,6 +186,8 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
return terrno;
}
taosArrayPush(pPageIdList, &pageId);
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf));
......@@ -201,7 +203,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
}
static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) {
......@@ -502,6 +504,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return code;
}
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) {
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
if (pDataBlock == NULL) {
......@@ -514,6 +517,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return terrno;
}
taosArrayPush(pPageIdList, &pageId);
int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf));
......@@ -525,12 +530,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
blockDataCleanup(pDataBlock);
}
sortComparClearup(&pHandle->cmpParam);
sortComparCleanup(&pHandle->cmpParam);
tMergeTreeDestroy(pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != 0) {
return code;
}
......
......@@ -30,7 +30,7 @@
// return RUN_ALL_TESTS();
// }
TEST(testCase, tSimpleHashTest) {
TEST(testCase, tSimpleHashTest_intKey) {
SSHashObj *pHashObj =
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
......@@ -57,12 +57,14 @@ TEST(testCase, tSimpleHashTest) {
int32_t iter = 0;
int64_t keySum = 0;
int64_t dataSum = 0;
size_t kLen = 0;
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
void *key = tSimpleHashGetKey(data, NULL);
void *key = tSimpleHashGetKey(data, &kLen);
ASSERT_EQ(keyLen, kLen);
keySum += *(int64_t *)key;
dataSum += *(int64_t *)data;
}
ASSERT_EQ(keySum, dataSum);
ASSERT_EQ(keySum, originKeySum);
......@@ -74,4 +76,69 @@ TEST(testCase, tSimpleHashTest) {
tSimpleHashCleanup(pHashObj);
}
TEST(testCase, tSimpleHashTest_binaryKey) {
SSHashObj *pHashObj =
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
assert(pHashObj != nullptr);
ASSERT_EQ(0, tSimpleHashGetSize(pHashObj));
typedef struct {
int64_t suid;
int64_t uid;
} SCombineKey;
size_t keyLen = sizeof(SCombineKey);
size_t dataLen = sizeof(int64_t);
int64_t originDataSum = 0;
SCombineKey combineKey = {0};
for (int64_t i = 1; i <= 100; ++i) {
combineKey.suid = i;
combineKey.uid = i + 1;
tSimpleHashPut(pHashObj, (const void *)&combineKey, keyLen, (const void *)&i, dataLen);
originDataSum += i;
ASSERT_EQ(i, tSimpleHashGetSize(pHashObj));
}
for (int64_t i = 1; i <= 100; ++i) {
combineKey.suid = i;
combineKey.uid = i + 1;
void *data = tSimpleHashGet(pHashObj, (const void *)&combineKey, keyLen);
ASSERT_EQ(i, *(int64_t *)data);
}
void *data = NULL;
int32_t iter = 0;
int64_t keySum = 0;
int64_t dataSum = 0;
size_t kLen = 0;
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
void *key = tSimpleHashGetKey(data, &kLen);
ASSERT_EQ(keyLen, kLen);
dataSum += *(int64_t *)data;
}
ASSERT_EQ(originDataSum, dataSum);
tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen);
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
void *key = tSimpleHashGetKey(data, &kLen);
ASSERT_EQ(keyLen, kLen);
}
for (int64_t i = 1; i <= 99; ++i) {
combineKey.suid = i;
combineKey.uid = i + 1;
tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen);
ASSERT_EQ(99 - i, tSimpleHashGetSize(pHashObj));
}
tSimpleHashCleanup(pHashObj);
}
#pragma GCC diagnostic pop
\ No newline at end of file
......@@ -51,20 +51,20 @@ struct tMemBucket;
typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value);
typedef struct tMemBucket {
int16_t numOfSlots;
int16_t type;
int16_t bytes;
int32_t total;
int32_t elemPerPage; // number of elements for each object
int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result
int32_t bufPageSize; // disk page size
MinMaxEntry range; // value range
int32_t times; // count that has been checked for deciding the correct data value buckets.
__compar_fn_t comparFn;
tMemBucketSlot * pSlots;
SDiskbasedBuf *pBuffer;
__perc_hash_func_t hashFunc;
int16_t numOfSlots;
int16_t type;
int16_t bytes;
int32_t total;
int32_t elemPerPage; // number of elements for each object
int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result
int32_t bufPageSize; // disk page size
MinMaxEntry range; // value range
int32_t times; // count that has been checked for deciding the correct data value buckets.
__compar_fn_t comparFn;
tMemBucketSlot* pSlots;
SDiskbasedBuf* pBuffer;
__perc_hash_func_t hashFunc;
SHashObj* groupPagesMap; // disk page map for different groups;
} tMemBucket;
tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval);
......
......@@ -3643,7 +3643,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int16_t type = pCtx->input.pData[0]->info.type;
int16_t type = pCtx->pExpr->base.resSchema.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
......
......@@ -33,13 +33,13 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
SFilePage *buffer = (SFilePage *)taosMemoryCalloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(SFilePage));
int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times);
SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
SArray* pIdList = *(SArray**)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
int32_t offset = 0;
for(int32_t i = 0; i < list->size; ++i) {
struct SPageInfo* pgInfo = *(struct SPageInfo**) taosArrayGet(list, i);
for(int32_t i = 0; i < taosArrayGetSize(pIdList); ++i) {
int32_t* pageId = taosArrayGet(pIdList, i);
SFilePage* pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
SFilePage* pg = getBufPage(pMemBucket->pBuffer, *pageId);
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes);
......@@ -97,11 +97,11 @@ double findOnlyResult(tMemBucket *pMemBucket) {
}
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times);
SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
SArray* list = *(SArray**)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
assert(list->size == 1);
struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0);
SFilePage* pPage = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
int32_t* pageId = taosArrayGet(list, 0);
SFilePage* pPage = getBufPage(pMemBucket->pBuffer, *pageId);
assert(pPage->num == 1);
double v = 0;
......@@ -233,7 +233,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
pBucket->times = 1;
pBucket->maxCapacity = 200000;
pBucket->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) {
// qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval);
taosMemoryFree(pBucket);
......@@ -280,8 +280,16 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
return;
}
void* p = taosHashIterate(pBucket->groupPagesMap, NULL);
while(p) {
SArray** p1 = p;
p = taosHashIterate(pBucket->groupPagesMap, p);
taosArrayDestroy(*p1);
}
destroyDiskbasedBuf(pBucket->pBuffer);
taosMemoryFreeClear(pBucket->pSlots);
taosHashCleanup(pBucket->groupPagesMap);
taosMemoryFreeClear(pBucket);
}
......@@ -357,8 +365,16 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data = NULL;
}
SArray* pPageIdList = (SArray*)taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId));
if (pPageIdList == NULL) {
SArray* pList = taosArrayInit(4, sizeof(int32_t));
taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pList, POINTER_BYTES);
pPageIdList = pList;
}
pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId);
pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId);
}
memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes);
......@@ -476,7 +492,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
resetSlotInfo(pMemBucket);
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1);
SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId);
SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
assert(list->size > 0);
for (int32_t f = 0; f < list->size; ++f) {
......
......@@ -84,6 +84,7 @@ typedef struct SUdf {
TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc;
TUdfAggMergeFunc aggMergeFunc;
TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc;
......@@ -271,6 +272,15 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf);
freeUdfInterBuf(&call->interBuf);
freeUdfInterBuf(&call->interBuf2);
subRsp->resultBuf = outBuf;
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
......@@ -309,6 +319,10 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
......@@ -560,7 +574,11 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
// TODO: merge
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc));
}
return 0;
}
......
......@@ -33,7 +33,7 @@ struct SDiskbasedBuf {
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SList* freePgList; // free page list
SHashObj* groupSet; // id hash table, todo remove it
SArray* pIdList; // page id list
SHashObj* all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
......@@ -241,26 +241,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return 0;
}
static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) {
assert(taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)) == NULL);
SArray* pa = taosArrayInit(1, POINTER_BYTES);
int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
assert(ret == 0);
return pa;
}
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pageId) {
SIDList list = NULL;
char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id
list = addNewGroup(pBuf, groupId);
} else {
list = (SIDList)(*p);
}
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) {
pBuf->numOfPages += 1;
SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
......@@ -273,7 +254,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag
ppi->pn = NULL;
ppi->dirty = false;
return *(SPageInfo**)taosArrayPush(list, &ppi);
return *(SPageInfo**)taosArrayPush(pBuf->pIdList, &ppi);
}
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
......@@ -293,16 +274,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
}
}
// int32_t pos = listNEles(pBuf->lruList);
// SListIter iter1 = {0};
// tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD);
// SListNode* pn1 = NULL;
// while((pn1 = tdListNext(&iter1)) != NULL) {
// SPageInfo* pageInfo = *(SPageInfo**) pn1->data;
// printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1);
// pos -= 1;
// }
return pn;
}
......@@ -382,7 +353,8 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
// init id hash table
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pPBuf->groupSet = taosHashInit(10, fn, true, false);
pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES
pPBuf->all = taosHashInit(10, fn, true, false);
......@@ -425,7 +397,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
*pageId = (++pBuf->allocateId);
// register page id info
pi = registerPage(pBuf, groupId, *pageId);
pi = registerPage(pBuf, *pageId);
// add to hash map
taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
......@@ -526,19 +498,11 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
pBuf->statis.releasePages += 1;
}
size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); }
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) {
assert(pBuf != NULL);
char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
if (p == NULL) { // it is a new group id
return pBuf->emptyDummyIdList;
} else {
return (SArray*)(*p);
}
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf) {
ASSERT(pBuf != NULL);
return pBuf->pIdList;
}
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
......@@ -578,26 +542,21 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
taosRemoveFile(pBuf->path);
taosMemoryFreeClear(pBuf->path);
SArray** p = taosHashIterate(pBuf->groupSet, NULL);
while (p) {
size_t n = taosArrayGetSize(*p);
for (int32_t i = 0; i < n; ++i) {
SPageInfo* pi = taosArrayGetP(*p, i);
taosMemoryFreeClear(pi->pData);
taosMemoryFreeClear(pi);
}
taosArrayDestroy(*p);
p = taosHashIterate(pBuf->groupSet, p);
size_t n = taosArrayGetSize(pBuf->pIdList);
for (int32_t i = 0; i < n; ++i) {
SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
taosMemoryFreeClear(pi->pData);
taosMemoryFreeClear(pi);
}
taosArrayDestroy(pBuf->pIdList);
tdListFree(pBuf->lruList);
tdListFree(pBuf->freePgList);
taosArrayDestroy(pBuf->emptyDummyIdList);
taosArrayDestroy(pBuf->pFree);
taosHashCleanup(pBuf->groupSet);
taosHashCleanup(pBuf->all);
taosMemoryFreeClear(pBuf->id);
......@@ -661,32 +620,32 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
printf(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n",
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages,
ps->loadBytes / (1024.0 * ps->loadPages));
if (ps->loadPages > 0) {
printf(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n",
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
} else {
printf("no page loaded\n");
}
}
void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
SArray** p = taosHashIterate(pBuf->groupSet, NULL);
while (p) {
size_t n = taosArrayGetSize(*p);
for (int32_t i = 0; i < n; ++i) {
SPageInfo* pi = taosArrayGetP(*p, i);
taosMemoryFreeClear(pi->pData);
taosMemoryFreeClear(pi);
}
taosArrayDestroy(*p);
p = taosHashIterate(pBuf->groupSet, p);
size_t n = taosArrayGetSize(pBuf->pIdList);
for (int32_t i = 0; i < n; ++i) {
SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
taosMemoryFreeClear(pi->pData);
taosMemoryFreeClear(pi);
}
taosArrayClear(pBuf->pIdList);
tdListEmpty(pBuf->lruList);
tdListEmpty(pBuf->freePgList);
taosArrayClear(pBuf->emptyDummyIdList);
taosArrayClear(pBuf->pFree);
taosHashClear(pBuf->groupSet);
taosHashClear(pBuf->all);
pBuf->numOfPages = 0; // all pages are in buffer in the first place
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册