提交 0e146b78 编写于 作者: S slguan

Merge branch 'develop' into feature/mpeer

[submodule "src/connector/go"]
path = src/connector/go
url = https://github.com/taosdata/driver-go
...@@ -105,7 +105,7 @@ RESTful服务使用的端口号,所有的HTTP请求(TCP)都需要向该接 ...@@ -105,7 +105,7 @@ RESTful服务使用的端口号,所有的HTTP请求(TCP)都需要向该接
**shellActivityTimer** **shellActivityTimer**
- 默认值:3 - 默认值:3
系统在服务端保持结果集的最长时间,范围[1-120]。 系统在服务端保持结果集的最长时间,单位:秒,范围[1-120]。
**maxUsers** **maxUsers**
- 默认值:10,000 - 默认值:10,000
...@@ -138,7 +138,7 @@ RESTful服务使用的端口号,所有的HTTP请求(TCP)都需要向该接 ...@@ -138,7 +138,7 @@ RESTful服务使用的端口号,所有的HTTP请求(TCP)都需要向该接
系统(服务端和客户端)运行日志开关: 系统(服务端和客户端)运行日志开关:
- 131 仅输出错误和警告信息 - 131 仅输出错误和警告信息
- 135 输错误(ERROR)、警告(WARN)、信息(Info) - 135 输错误(ERROR)、警告(WARN)、信息(Info)
不同应用场景的数据往往具有不同的数据特征,比如保留天数、副本数、采集频次、记录大小、采集点的数量、压缩等都可完全不同。为获得在存储上的最高效率,TDengine提供如下存储相关的系统配置参数: 不同应用场景的数据往往具有不同的数据特征,比如保留天数、副本数、采集频次、记录大小、采集点的数量、压缩等都可完全不同。为获得在存储上的最高效率,TDengine提供如下存储相关的系统配置参数:
......
...@@ -3578,121 +3578,6 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -3578,121 +3578,6 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
/*
* Compare two strings
* TSDB_MATCH: Match
* TSDB_NOMATCH: No match
* TSDB_NOWILDCARDMATCH: No match in spite of having * or % wildcards.
* Like matching rules:
* '%': Matches zero or more characters
* '_': Matches one character
*
*/
int patternMatch(const char *patterStr, const char *str, size_t size, const SPatternCompareInfo *pInfo) {
char c, c1;
int32_t i = 0;
int32_t j = 0;
while ((c = patterStr[i++]) != 0) {
if (c == pInfo->matchAll) { /* Match "*" */
while ((c = patterStr[i++]) == pInfo->matchAll || c == pInfo->matchOne) {
if (c == pInfo->matchOne && (j > size || str[j++] == 0)) {
// empty string, return not match
return TSDB_PATTERN_NOWILDCARDMATCH;
}
}
if (c == 0) {
return TSDB_PATTERN_MATCH; /* "*" at the end of the pattern matches */
}
char next[3] = {toupper(c), tolower(c), 0};
while (1) {
size_t n = strcspn(str, next);
str += n;
if (str[0] == 0 || (n >= size - 1)) {
break;
}
int32_t ret = patternMatch(&patterStr[i], ++str, size - n - 1, pInfo);
if (ret != TSDB_PATTERN_NOMATCH) {
return ret;
}
}
return TSDB_PATTERN_NOWILDCARDMATCH;
}
c1 = str[j++];
if (j <= size) {
if (c == c1 || tolower(c) == tolower(c1) || (c == pInfo->matchOne && c1 != 0)) {
continue;
}
}
return TSDB_PATTERN_NOMATCH;
}
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
}
int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, const SPatternCompareInfo *pInfo) {
wchar_t c, c1;
wchar_t matchOne = L'_'; // "_"
wchar_t matchAll = L'%'; // "%"
int32_t i = 0;
int32_t j = 0;
while ((c = patterStr[i++]) != 0) {
if (c == matchAll) { /* Match "%" */
while ((c = patterStr[i++]) == matchAll || c == matchOne) {
if (c == matchOne && (j > size || str[j++] == 0)) {
return TSDB_PATTERN_NOWILDCARDMATCH;
}
}
if (c == 0) {
return TSDB_PATTERN_MATCH;
}
wchar_t accept[3] = {towupper(c), towlower(c), 0};
while (1) {
size_t n = wcsspn(str, accept);
str += n;
if (str[0] == 0 || (n >= size - 1)) {
break;
}
str++;
int32_t ret = WCSPatternMatch(&patterStr[i], str, wcslen(str), pInfo);
if (ret != TSDB_PATTERN_NOMATCH) {
return ret;
}
}
return TSDB_PATTERN_NOWILDCARDMATCH;
}
c1 = str[j++];
if (j <= size) {
if (c == c1 || towlower(c) == towlower(c1) || (c == matchOne && c1 != 0)) {
continue;
}
}
return TSDB_PATTERN_NOMATCH;
}
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
}
static void getStatics_i8(int64_t *primaryKey, int32_t type, int8_t *data, int32_t numOfRow, int64_t *min, int64_t *max, static void getStatics_i8(int64_t *primaryKey, int32_t type, int8_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) { int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
*min = INT64_MAX; *min = INT64_MAX;
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "ttokendef.h" #include "ttokendef.h"
#include "name.h" #include "name.h"
#include "tcompare.h"
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0" #define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
......
...@@ -191,7 +191,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -191,7 +191,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
} }
if (pSql->cmd.command < TSDB_SQL_MGMT) { if (pSql->cmd.command < TSDB_SQL_MGMT) {
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port); tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
......
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
import "C"
import (
"context"
"errors"
"database/sql/driver"
"unsafe"
"strconv"
"strings"
"time"
)
type taosConn struct {
taos unsafe.Pointer
affectedRows int
insertId int
cfg *config
status statusFlag
parseTime bool
reset bool // set when the Go SQL package calls ResetSession
}
type taosSqlResult struct {
affectedRows int64
insertId int64
}
func (res *taosSqlResult) LastInsertId() (int64, error) {
return res.insertId, nil
}
func (res *taosSqlResult) RowsAffected() (int64, error) {
return res.affectedRows, nil
}
func (mc *taosConn) Begin() (driver.Tx, error) {
taosLog.Println("taosSql not support transaction")
return nil, errors.New("taosSql not support transaction")
}
func (mc *taosConn) Close() (err error) {
if mc.taos == nil {
return errConnNoExist
}
mc.taos_close()
return nil
}
func (mc *taosConn) Prepare(query string) (driver.Stmt, error) {
if mc.taos == nil {
return nil, errInvalidConn
}
stmt := &taosSqlStmt{
mc: mc,
pSql: query,
}
// find ? count and save to stmt.paramCount
stmt.paramCount = strings.Count(query, "?")
//fmt.Printf("prepare alloc stmt:%p, sql:%s\n", stmt, query)
taosLog.Printf("prepare alloc stmt:%p, sql:%s\n", stmt, query)
return stmt, nil
}
func (mc *taosConn) interpolateParams(query string, args []driver.Value) (string, error) {
// Number of ? should be same to len(args)
if strings.Count(query, "?") != len(args) {
return "", driver.ErrSkip
}
buf := make([]byte, defaultBufSize)
buf = buf[:0] // clear buf
argPos := 0
for i := 0; i < len(query); i++ {
q := strings.IndexByte(query[i:], '?')
if q == -1 {
buf = append(buf, query[i:]...)
break
}
buf = append(buf, query[i:i+q]...)
i += q
arg := args[argPos]
argPos++
if arg == nil {
buf = append(buf, "NULL"...)
continue
}
switch v := arg.(type) {
case int64:
buf = strconv.AppendInt(buf, v, 10)
case uint64:
// Handle uint64 explicitly because our custom ConvertValue emits unsigned values
buf = strconv.AppendUint(buf, v, 10)
case float64:
buf = strconv.AppendFloat(buf, v, 'g', -1, 64)
case bool:
if v {
buf = append(buf, '1')
} else {
buf = append(buf, '0')
}
case time.Time:
if v.IsZero() {
buf = append(buf, "'0000-00-00'"...)
} else {
v := v.In(mc.cfg.loc)
v = v.Add(time.Nanosecond * 500) // To round under microsecond
year := v.Year()
year100 := year / 100
year1 := year % 100
month := v.Month()
day := v.Day()
hour := v.Hour()
minute := v.Minute()
second := v.Second()
micro := v.Nanosecond() / 1000
buf = append(buf, []byte{
'\'',
digits10[year100], digits01[year100],
digits10[year1], digits01[year1],
'-',
digits10[month], digits01[month],
'-',
digits10[day], digits01[day],
' ',
digits10[hour], digits01[hour],
':',
digits10[minute], digits01[minute],
':',
digits10[second], digits01[second],
}...)
if micro != 0 {
micro10000 := micro / 10000
micro100 := micro / 100 % 100
micro1 := micro % 100
buf = append(buf, []byte{
'.',
digits10[micro10000], digits01[micro10000],
digits10[micro100], digits01[micro100],
digits10[micro1], digits01[micro1],
}...)
}
buf = append(buf, '\'')
}
case []byte:
if v == nil {
buf = append(buf, "NULL"...)
} else {
buf = append(buf, "_binary'"...)
if mc.status&statusNoBackslashEscapes == 0 {
buf = escapeBytesBackslash(buf, v)
} else {
buf = escapeBytesQuotes(buf, v)
}
buf = append(buf, '\'')
}
case string:
//buf = append(buf, '\'')
if mc.status&statusNoBackslashEscapes == 0 {
buf = escapeStringBackslash(buf, v)
} else {
buf = escapeStringQuotes(buf, v)
}
//buf = append(buf, '\'')
default:
return "", driver.ErrSkip
}
//if len(buf)+4 > mc.maxAllowedPacket {
if len(buf)+4 > maxTaosSqlLen {
return "", driver.ErrSkip
}
}
if argPos != len(args) {
return "", driver.ErrSkip
}
return string(buf), nil
}
func (mc *taosConn) Exec(query string, args []driver.Value) (driver.Result, error) {
if mc.taos == nil {
return nil, driver.ErrBadConn
}
if len(args) != 0 {
if !mc.cfg.interpolateParams {
return nil, driver.ErrSkip
}
// try to interpolate the parameters to save extra roundtrips for preparing and closing a statement
prepared, err := mc.interpolateParams(query, args)
if err != nil {
return nil, err
}
query = prepared
}
mc.affectedRows = 0
mc.insertId = 0
_, err := mc.taosQuery(query)
if err == nil {
return &taosSqlResult{
affectedRows: int64(mc.affectedRows),
insertId: int64(mc.insertId),
}, err
}
return nil, err
}
func (mc *taosConn) Query(query string, args []driver.Value) (driver.Rows, error) {
return mc.query(query, args)
}
func (mc *taosConn) query(query string, args []driver.Value) (*textRows, error) {
if mc.taos == nil {
return nil, driver.ErrBadConn
}
if len(args) != 0 {
if !mc.cfg.interpolateParams {
return nil, driver.ErrSkip
}
// try client-side prepare to reduce roundtrip
prepared, err := mc.interpolateParams(query, args)
if err != nil {
return nil, err
}
query = prepared
}
num_fields, err := mc.taosQuery(query)
if err == nil {
// Read Result
rows := new(textRows)
rows.mc = mc
// Columns field
rows.rs.columns, err = mc.readColumns(num_fields)
return rows, err
}
return nil, err
}
// Ping implements driver.Pinger interface
func (mc *taosConn) Ping(ctx context.Context) (err error) {
if mc.taos != nil {
return nil
}
return errInvalidConn
}
// BeginTx implements driver.ConnBeginTx interface
func (mc *taosConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
taosLog.Println("taosSql not support transaction")
return nil, errors.New("taosSql not support transaction")
}
func (mc *taosConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
if mc.taos == nil {
return nil, errInvalidConn
}
dargs, err := namedValueToValue(args)
if err != nil {
return nil, err
}
rows, err := mc.query(query, dargs)
if err != nil {
return nil, err
}
return rows, err
}
func (mc *taosConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
if mc.taos == nil {
return nil, errInvalidConn
}
dargs, err := namedValueToValue(args)
if err != nil {
return nil, err
}
return mc.Exec(query, dargs)
}
func (mc *taosConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
if mc.taos == nil {
return nil, errInvalidConn
}
stmt, err := mc.Prepare(query)
if err != nil {
return nil, err
}
return stmt, nil
}
func (stmt *taosSqlStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
if stmt.mc == nil {
return nil, errInvalidConn
}
dargs, err := namedValueToValue(args)
if err != nil {
return nil, err
}
rows, err := stmt.query(dargs)
if err != nil {
return nil, err
}
return rows, err
}
func (stmt *taosSqlStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
if stmt.mc == nil {
return nil, errInvalidConn
}
dargs, err := namedValueToValue(args)
if err != nil {
return nil, err
}
return stmt.Exec(dargs)
}
func (mc *taosConn) CheckNamedValue(nv *driver.NamedValue) (err error) {
nv.Value, err = converter{}.ConvertValue(nv.Value)
return
}
// ResetSession implements driver.SessionResetter.
// (From Go 1.10)
func (mc *taosConn) ResetSession(ctx context.Context) error {
if mc.taos == nil {
return driver.ErrBadConn
}
mc.reset = true
return nil
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
import (
"context"
"database/sql/driver"
)
type connector struct {
cfg *config
}
// Connect implements driver.Connector interface.
// Connect returns a connection to the database.
func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
var err error
// New taosConn
mc := &taosConn{
cfg: c.cfg,
parseTime: c.cfg.parseTime,
}
// Connect to Server
mc.taos, err = mc.taosConnect(mc.cfg.addr, mc.cfg.user, mc.cfg.passwd, mc.cfg.dbName, mc.cfg.port)
if err != nil {
return nil, err
}
return mc, nil
}
// Driver implements driver.Connector interface.
// Driver returns &taosSQLDriver{}.
func (c *connector) Driver() driver.Driver {
return &taosSQLDriver{}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
const (
timeFormat = "2006-01-02 15:04:05"
maxTaosSqlLen = 65380
defaultBufSize = maxTaosSqlLen + 32
)
type fieldType byte
type fieldFlag uint16
const (
flagNotNULL fieldFlag = 1 << iota
)
type statusFlag uint16
const (
statusInTrans statusFlag = 1 << iota
statusInAutocommit
statusReserved // Not in documentation
statusMoreResultsExists
statusNoGoodIndexUsed
statusNoIndexUsed
statusCursorExists
statusLastRowSent
statusDbDropped
statusNoBackslashEscapes
statusMetadataChanged
statusQueryWasSlow
statusPsOutParams
statusInTransReadonly
statusSessionStateChanged
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
import (
"context"
"database/sql"
"database/sql/driver"
)
// taosSqlDriver is exported to make the driver directly accessible.
// In general the driver is used via the database/sql package.
type taosSQLDriver struct{}
// Open new Connection.
// the DSN string is formatted
func (d taosSQLDriver) Open(dsn string) (driver.Conn, error) {
cfg, err := parseDSN(dsn)
if err != nil {
return nil, err
}
c := &connector{
cfg: cfg,
}
return c.Connect(context.Background())
}
func init() {
sql.Register("taosSql", &taosSQLDriver{})
taosLogInit()
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
import (
"errors"
"net/url"
"strconv"
"strings"
"time"
)
var (
errInvalidDSNUnescaped = errors.New("invalid DSN: did you forget to escape a param value?")
errInvalidDSNAddr = errors.New("invalid DSN: network address not terminated (missing closing brace)")
errInvalidDSNPort = errors.New("invalid DSN: network port is not a valid number")
errInvalidDSNNoSlash = errors.New("invalid DSN: missing the slash separating the database name")
)
// Config is a configuration parsed from a DSN string.
// If a new Config is created instead of being parsed from a DSN string,
// the NewConfig function should be used, which sets default values.
type config struct {
user string // Username
passwd string // Password (requires User)
net string // Network type
addr string // Network address (requires Net)
port int
dbName string // Database name
params map[string]string // Connection parameters
loc *time.Location // Location for time.Time values
columnsWithAlias bool // Prepend table alias to column names
interpolateParams bool // Interpolate placeholders into query string
parseTime bool // Parse time values to time.Time
}
// NewConfig creates a new Config and sets default values.
func newConfig() *config {
return &config{
loc: time.UTC,
interpolateParams: true,
parseTime: true,
}
}
// ParseDSN parses the DSN string to a Config
func parseDSN(dsn string) (cfg *config, err error) {
taosLog.Println("input dsn:", dsn)
// New config with some default values
cfg = newConfig()
// [user[:password]@][net[(addr)]]/dbname[?param1=value1&paramN=valueN]
// Find the last '/' (since the password or the net addr might contain a '/')
foundSlash := false
for i := len(dsn) - 1; i >= 0; i-- {
if dsn[i] == '/' {
foundSlash = true
var j, k int
// left part is empty if i <= 0
if i > 0 {
// [username[:password]@][protocol[(address)]]
// Find the last '@' in dsn[:i]
for j = i; j >= 0; j-- {
if dsn[j] == '@' {
// username[:password]
// Find the first ':' in dsn[:j]
for k = 0; k < j; k++ {
if dsn[k] == ':' {
cfg.passwd = dsn[k+1 : j]
break
}
}
cfg.user = dsn[:k]
break
}
}
// [protocol[(address)]]
// Find the first '(' in dsn[j+1:i]
for k = j + 1; k < i; k++ {
if dsn[k] == '(' {
// dsn[i-1] must be == ')' if an address is specified
if dsn[i-1] != ')' {
if strings.ContainsRune(dsn[k+1:i], ')') {
return nil, errInvalidDSNUnescaped
}
return nil, errInvalidDSNAddr
}
strs := strings.Split(dsn[k+1:i-1], ":")
if len(strs) == 1 {
return nil, errInvalidDSNAddr
}
cfg.addr = strs[0]
cfg.port, err = strconv.Atoi(strs[1])
if err != nil {
return nil, errInvalidDSNPort
}
break
}
}
cfg.net = dsn[j+1 : k]
}
// dbname[?param1=value1&...&paramN=valueN]
// Find the first '?' in dsn[i+1:]
for j = i + 1; j < len(dsn); j++ {
if dsn[j] == '?' {
if err = parseDSNParams(cfg, dsn[j+1:]); err != nil {
return
}
break
}
}
cfg.dbName = dsn[i+1 : j]
break
}
}
if !foundSlash && len(dsn) > 0 {
return nil, errInvalidDSNNoSlash
}
taosLog.Printf("cfg info: %+v", cfg)
return
}
// parseDSNParams parses the DSN "query string"
// Values must be url.QueryEscape'ed
func parseDSNParams(cfg *config, params string) (err error) {
for _, v := range strings.Split(params, "&") {
param := strings.SplitN(v, "=", 2)
if len(param) != 2 {
continue
}
// cfg params
switch value := param[1]; param[0] {
case "columnsWithAlias":
var isBool bool
cfg.columnsWithAlias, isBool = readBool(value)
if !isBool {
return errors.New("invalid bool value: " + value)
}
// Enable client side placeholder substitution
case "interpolateParams":
var isBool bool
cfg.interpolateParams, isBool = readBool(value)
if !isBool {
return errors.New("invalid bool value: " + value)
}
// Time Location
case "loc":
if value, err = url.QueryUnescape(value); err != nil {
return
}
cfg.loc, err = time.LoadLocation(value)
if err != nil {
return
}
// time.Time parsing
case "parseTime":
var isBool bool
cfg.parseTime, isBool = readBool(value)
if !isBool {
return errors.New("invalid bool value: " + value)
}
default:
// lazy init
if cfg.params == nil {
cfg.params = make(map[string]string)
}
if cfg.params[param[0]], err = url.QueryUnescape(value); err != nil {
return
}
}
}
return
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
/*
#cgo CFLAGS : -I/usr/include
#cgo LDFLAGS: -L/usr/lib -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h>
*/
import "C"
import (
"database/sql/driver"
"errors"
"fmt"
"io"
"strconv"
"time"
"unsafe"
)
/******************************************************************************
* Result *
******************************************************************************/
// Read Packets as Field Packets until EOF-Packet or an Error appears
func (mc *taosConn) readColumns(count int) ([]taosSqlField, error) {
columns := make([]taosSqlField, count)
var result unsafe.Pointer
result = C.taos_use_result(mc.taos)
if result == nil {
return nil, errors.New("invalid result")
}
pFields := (*C.struct_taosField)(C.taos_fetch_fields(result))
// TODO: Optimized rewriting !!!!
fields := (*[1 << 30]C.struct_taosField)(unsafe.Pointer(pFields))
for i := 0; i < count; i++ {
//columns[i].tableName = ms.taos.
//fmt.Println(reflect.TypeOf(fields[i].name))
var charray []byte
for j := range fields[i].name {
//fmt.Println("fields[i].name[j]: ", fields[i].name[j])
if fields[i].name[j] != 0 {
charray = append(charray, byte(fields[i].name[j]))
} else {
break
}
}
columns[i].name = string(charray)
columns[i].length = (uint32)(fields[i].bytes)
columns[i].fieldType = fieldType(fields[i]._type)
columns[i].flags = 0
// columns[i].decimals = 0
//columns[i].charSet = 0
}
return columns, nil
}
func (rows *taosSqlRows) readRow(dest []driver.Value) error {
mc := rows.mc
if rows.rs.done || mc == nil {
return io.EOF
}
var result unsafe.Pointer
result = C.taos_use_result(mc.taos)
if result == nil {
return errors.New(C.GoString(C.taos_errstr(mc.taos)))
}
//var row *unsafe.Pointer
row := C.taos_fetch_row(result)
if row == nil {
rows.rs.done = true
C.taos_free_result(result)
rows.mc = nil
return io.EOF
}
// because sizeof(void*) == sizeof(int*) == 8
// notes: sizeof(int) == 8 in go, but sizeof(int) == 4 in C.
for i := range dest {
currentRow := (unsafe.Pointer)(uintptr(*((*int)(unsafe.Pointer(uintptr(unsafe.Pointer(row)) + uintptr(i)*unsafe.Sizeof(int(0)))))))
if currentRow == nil {
dest[i] = nil
continue
}
switch rows.rs.columns[i].fieldType {
case C.TSDB_DATA_TYPE_BOOL:
if (*((*byte)(currentRow))) != 0 {
dest[i] = true
} else {
dest[i] = false
}
break
case C.TSDB_DATA_TYPE_TINYINT:
dest[i] = (int)(*((*byte)(currentRow)))
break
case C.TSDB_DATA_TYPE_SMALLINT:
dest[i] = (int16)(*((*int16)(currentRow)))
break
case C.TSDB_DATA_TYPE_INT:
dest[i] = (int)(*((*int32)(currentRow))) // notes int32 of go <----> int of C
break
case C.TSDB_DATA_TYPE_BIGINT:
dest[i] = (int64)(*((*int64)(currentRow)))
break
case C.TSDB_DATA_TYPE_FLOAT:
dest[i] = (*((*float32)(currentRow)))
break
case C.TSDB_DATA_TYPE_DOUBLE:
dest[i] = (*((*float64)(currentRow)))
break
case C.TSDB_DATA_TYPE_BINARY, C.TSDB_DATA_TYPE_NCHAR:
charLen := rows.rs.columns[i].length
var index uint32
binaryVal := make([]byte, charLen)
for index = 0; index < charLen; index++ {
binaryVal[index] = *((*byte)(unsafe.Pointer(uintptr(currentRow) + uintptr(index))))
}
dest[i] = string(binaryVal[:])
break
case C.TSDB_DATA_TYPE_TIMESTAMP:
if mc.cfg.parseTime == true {
timestamp := (int64)(*((*int64)(currentRow)))
dest[i] = timestampConvertToString(timestamp, int(C.taos_result_precision(result)))
} else {
dest[i] = (int64)(*((*int64)(currentRow)))
}
break
default:
fmt.Println("default fieldType: set dest[] to nil")
dest[i] = nil
break
}
}
return nil
}
// Read result as Field format until all rows or an Error appears
// call this func in conn mode
func (rows *textRows) readRow(dest []driver.Value) error {
return rows.taosSqlRows.readRow(dest)
}
// call thsi func in stmt mode
func (rows *binaryRows) readRow(dest []driver.Value) error {
return rows.taosSqlRows.readRow(dest)
}
func timestampConvertToString(timestamp int64, precision int) string {
var decimal, sVal, nsVal int64
if precision == 0 {
decimal = timestamp % 1000
sVal = timestamp / 1000
nsVal = decimal * 1000
} else {
decimal = timestamp % 1000000
sVal = timestamp / 1000000
nsVal = decimal * 1000000
}
date_time := time.Unix(sVal, nsVal)
//const base_format = "2006-01-02 15:04:05"
str_time := date_time.Format(timeFormat)
return (str_time + "." + strconv.Itoa(int(decimal)))
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
/*
#cgo CFLAGS : -I/usr/include
#cgo LDFLAGS: -L/usr/lib -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h>
*/
import "C"
import (
"database/sql"
"database/sql/driver"
"io"
"math"
"reflect"
)
type taosSqlField struct {
tableName string
name string
length uint32
flags fieldFlag // indicate whether this field can is null
fieldType fieldType
decimals byte
charSet uint8
}
type resultSet struct {
columns []taosSqlField
columnNames []string
done bool
}
type taosSqlRows struct {
mc *taosConn
rs resultSet
}
type binaryRows struct {
taosSqlRows
}
type textRows struct {
taosSqlRows
}
func (rows *taosSqlRows) Columns() []string {
if rows.rs.columnNames != nil {
return rows.rs.columnNames
}
columns := make([]string, len(rows.rs.columns))
if rows.mc != nil && rows.mc.cfg.columnsWithAlias {
for i := range columns {
if tableName := rows.rs.columns[i].tableName; len(tableName) > 0 {
columns[i] = tableName + "." + rows.rs.columns[i].name
} else {
columns[i] = rows.rs.columns[i].name
}
}
} else {
for i := range columns {
columns[i] = rows.rs.columns[i].name
}
}
rows.rs.columnNames = columns
return columns
}
func (rows *taosSqlRows) ColumnTypeDatabaseTypeName(i int) string {
return rows.rs.columns[i].typeDatabaseName()
}
func (rows *taosSqlRows) ColumnTypeLength(i int) (length int64, ok bool) {
return int64(rows.rs.columns[i].length), true
}
func (rows *taosSqlRows) ColumnTypeNullable(i int) (nullable, ok bool) {
return rows.rs.columns[i].flags&flagNotNULL == 0, true
}
func (rows *taosSqlRows) ColumnTypePrecisionScale(i int) (int64, int64, bool) {
column := rows.rs.columns[i]
decimals := int64(column.decimals)
switch column.fieldType {
case C.TSDB_DATA_TYPE_FLOAT:
fallthrough
case C.TSDB_DATA_TYPE_DOUBLE:
if decimals == 0x1f {
return math.MaxInt64, math.MaxInt64, true
}
return math.MaxInt64, decimals, true
}
return 0, 0, false
}
func (rows *taosSqlRows) ColumnTypeScanType(i int) reflect.Type {
return rows.rs.columns[i].scanType()
}
func (rows *taosSqlRows) Close() error {
if rows.mc != nil {
result := C.taos_use_result(rows.mc.taos)
if result != nil {
C.taos_free_result(result)
}
rows.mc = nil
}
return nil
}
func (rows *taosSqlRows) HasNextResultSet() (b bool) {
if rows.mc == nil {
return false
}
return rows.mc.status&statusMoreResultsExists != 0
}
func (rows *taosSqlRows) nextResultSet() (int, error) {
if rows.mc == nil {
return 0, io.EOF
}
// Remove unread packets from stream
if !rows.rs.done {
rows.rs.done = true
}
if !rows.HasNextResultSet() {
rows.mc = nil
return 0, io.EOF
}
rows.rs = resultSet{}
return 0,nil
}
func (rows *taosSqlRows) nextNotEmptyResultSet() (int, error) {
for {
resLen, err := rows.nextResultSet()
if err != nil {
return 0, err
}
if resLen > 0 {
return resLen, nil
}
rows.rs.done = true
}
}
func (rows *binaryRows) NextResultSet() error {
resLen, err := rows.nextNotEmptyResultSet()
if err != nil {
return err
}
rows.rs.columns, err = rows.mc.readColumns(resLen)
return err
}
// stmt.Query return binary rows, and get row from this func
func (rows *binaryRows) Next(dest []driver.Value) error {
if mc := rows.mc; mc != nil {
// Fetch next row from stream
return rows.readRow(dest)
}
return io.EOF
}
func (rows *textRows) NextResultSet() (err error) {
resLen, err := rows.nextNotEmptyResultSet()
if err != nil {
return err
}
rows.rs.columns, err = rows.mc.readColumns(resLen)
return err
}
// db.Query return text rows, and get row from this func
func (rows *textRows) Next(dest []driver.Value) error {
if mc := rows.mc; mc != nil {
// Fetch next row from stream
return rows.readRow(dest)
}
return io.EOF
}
func (mf *taosSqlField) typeDatabaseName() string {
//fmt.Println("######## (mf *taosSqlField) typeDatabaseName() mf.fieldType:", mf.fieldType)
switch mf.fieldType {
case C.TSDB_DATA_TYPE_BOOL:
return "BOOL"
case C.TSDB_DATA_TYPE_TINYINT:
return "TINYINT"
case C.TSDB_DATA_TYPE_SMALLINT:
return "SMALLINT"
case C.TSDB_DATA_TYPE_INT:
return "INT"
case C.TSDB_DATA_TYPE_BIGINT:
return "BIGINT"
case C.TSDB_DATA_TYPE_FLOAT:
return "FLOAT"
case C.TSDB_DATA_TYPE_DOUBLE:
return "DOUBLE"
case C.TSDB_DATA_TYPE_BINARY:
return "BINARY"
case C.TSDB_DATA_TYPE_NCHAR:
return "NCHAR"
case C.TSDB_DATA_TYPE_TIMESTAMP:
return "TIMESTAMP"
default:
return ""
}
}
var (
scanTypeFloat32 = reflect.TypeOf(float32(0))
scanTypeFloat64 = reflect.TypeOf(float64(0))
scanTypeInt8 = reflect.TypeOf(int8(0))
scanTypeInt16 = reflect.TypeOf(int16(0))
scanTypeInt32 = reflect.TypeOf(int32(0))
scanTypeInt64 = reflect.TypeOf(int64(0))
scanTypeNullTime = reflect.TypeOf(NullTime{})
scanTypeRawBytes = reflect.TypeOf(sql.RawBytes{})
scanTypeUnknown = reflect.TypeOf(new(interface{}))
)
func (mf *taosSqlField) scanType() reflect.Type {
//fmt.Println("######## (mf *taosSqlField) scanType() mf.fieldType:", mf.fieldType)
switch mf.fieldType {
case C.TSDB_DATA_TYPE_BOOL:
return scanTypeInt8
case C.TSDB_DATA_TYPE_TINYINT:
return scanTypeInt8
case C.TSDB_DATA_TYPE_SMALLINT:
return scanTypeInt16
case C.TSDB_DATA_TYPE_INT:
return scanTypeInt32
case C.TSDB_DATA_TYPE_BIGINT:
return scanTypeInt64
case C.TSDB_DATA_TYPE_FLOAT:
return scanTypeFloat32
case C.TSDB_DATA_TYPE_DOUBLE:
return scanTypeFloat64
case C.TSDB_DATA_TYPE_BINARY:
return scanTypeRawBytes
case C.TSDB_DATA_TYPE_NCHAR:
return scanTypeRawBytes
case C.TSDB_DATA_TYPE_TIMESTAMP:
return scanTypeNullTime
default:
return scanTypeUnknown
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
import (
"database/sql/driver"
"fmt"
"reflect"
)
type taosSqlStmt struct {
mc *taosConn
id uint32
pSql string
paramCount int
}
func (stmt *taosSqlStmt) Close() error {
return nil
}
func (stmt *taosSqlStmt) NumInput() int {
return stmt.paramCount
}
func (stmt *taosSqlStmt) Exec(args []driver.Value) (driver.Result, error) {
if stmt.mc == nil || stmt.mc.taos == nil {
return nil, errInvalidConn
}
return stmt.mc.Exec(stmt.pSql, args)
}
func (stmt *taosSqlStmt) Query(args []driver.Value) (driver.Rows, error) {
if stmt.mc == nil || stmt.mc.taos == nil {
return nil, errInvalidConn
}
return stmt.query(args)
}
func (stmt *taosSqlStmt) query(args []driver.Value) (*binaryRows, error) {
mc := stmt.mc
if mc == nil || mc.taos == nil {
return nil, errInvalidConn
}
querySql := stmt.pSql
if len(args) != 0 {
if !mc.cfg.interpolateParams {
return nil, driver.ErrSkip
}
// try client-side prepare to reduce roundtrip
prepared, err := mc.interpolateParams(stmt.pSql, args)
if err != nil {
return nil, err
}
querySql = prepared
}
num_fields, err := mc.taosQuery(querySql)
if err == nil {
// Read Result
rows := new(binaryRows)
rows.mc = mc
// Columns field
rows.rs.columns, err = mc.readColumns(num_fields)
return rows, err
}
return nil, err
}
type converter struct{}
// ConvertValue mirrors the reference/default converter in database/sql/driver
// with _one_ exception. We support uint64 with their high bit and the default
// implementation does not. This function should be kept in sync with
// database/sql/driver defaultConverter.ConvertValue() except for that
// deliberate difference.
func (c converter) ConvertValue(v interface{}) (driver.Value, error) {
if driver.IsValue(v) {
return v, nil
}
if vr, ok := v.(driver.Valuer); ok {
sv, err := callValuerValue(vr)
if err != nil {
return nil, err
}
if !driver.IsValue(sv) {
return nil, fmt.Errorf("non-Value type %T returned from Value", sv)
}
return sv, nil
}
rv := reflect.ValueOf(v)
switch rv.Kind() {
case reflect.Ptr:
// indirect pointers
if rv.IsNil() {
return nil, nil
} else {
return c.ConvertValue(rv.Elem().Interface())
}
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return rv.Int(), nil
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return rv.Uint(), nil
case reflect.Float32, reflect.Float64:
return rv.Float(), nil
case reflect.Bool:
return rv.Bool(), nil
case reflect.Slice:
ek := rv.Type().Elem().Kind()
if ek == reflect.Uint8 {
return rv.Bytes(), nil
}
return nil, fmt.Errorf("unsupported type %T, a slice of %s", v, ek)
case reflect.String:
return rv.String(), nil
}
return nil, fmt.Errorf("unsupported type %T, a %s", v, rv.Kind())
}
var valuerReflectType = reflect.TypeOf((*driver.Valuer)(nil)).Elem()
// callValuerValue returns vr.Value(), with one exception:
// If vr.Value is an auto-generated method on a pointer type and the
// pointer is nil, it would panic at runtime in the panicwrap
// method. Treat it like nil instead.
//
// This is so people can implement driver.Value on value types and
// still use nil pointers to those types to mean nil/NULL, just like
// string/*string.
//
// This is an exact copy of the same-named unexported function from the
// database/sql package.
func callValuerValue(vr driver.Valuer) (v driver.Value, err error) {
if rv := reflect.ValueOf(vr); rv.Kind() == reflect.Ptr &&
rv.IsNil() &&
rv.Type().Elem().Implements(valuerReflectType) {
return nil, nil
}
return vr.Value()
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
import (
"bufio"
"errors"
"fmt"
"io"
"log"
"os"
"strings"
)
// Various errors the driver might return.
var (
errInvalidConn = errors.New("invalid connection")
errConnNoExist = errors.New("no existent connection ")
)
var taosLog *log.Logger
// SetLogger is used to set the logger for critical errors.
// The initial logger
func taosLogInit() {
cfgName := "/etc/taos/taos.cfg"
logNameDefault := "/var/log/taos/taosgo.log"
var logName string
// get log path from cfg file
cfgFile, err := os.OpenFile(cfgName, os.O_RDONLY, 0644)
defer cfgFile.Close()
if err != nil {
fmt.Println(err)
logName = logNameDefault
} else {
logName, err = getLogNameFromCfg(cfgFile)
if err != nil {
fmt.Println(err)
logName = logNameDefault
}
}
logFile, err := os.OpenFile(logName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
taosLog = log.New(logFile, "", log.LstdFlags)
taosLog.SetPrefix("TAOS DRIVER ")
taosLog.SetFlags(log.LstdFlags|log.Lshortfile)
}
func getLogNameFromCfg(f *os.File) (string, error) {
// Create file buf, *Reader
r := bufio.NewReader(f)
for {
//read one line, return to slice b
b, _, err := r.ReadLine()
if err != nil {
if err == io.EOF {
break
}
panic(err)
}
// Remove space of left and right
s := strings.TrimSpace(string(b))
if strings.Index(s, "#") == 0 {
// comment line
continue
}
if len(s) == 0 {
continue
}
var ns string
// If there is a comment on the right of the line, must be remove
index := strings.Index(s, "#")
if index > 0 {
// Gets the string to the left of the comment to determine whether it is empty
ns = s[:index]
if len(ns) == 0 {
continue
}
} else {
ns = s;
}
ss := strings.Fields(ns)
if strings.Compare("logDir", ss[0]) != 0 {
continue
}
if len(ss) < 2 {
break
}
// Add a filename after the path
logName := ss[1] + "/taosgo.log"
return logName,nil
}
return "", errors.New("no config log path, use default")
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
/*
#cgo CFLAGS : -I/usr/include
#cgo LDFLAGS: -L/usr/lib -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h>
*/
import "C"
import (
"errors"
"unsafe"
)
func (mc *taosConn) taosConnect(ip, user, pass, db string, port int) (taos unsafe.Pointer, err error) {
cuser := C.CString(user)
cpass := C.CString(pass)
cip := C.CString(ip)
cdb := C.CString(db)
defer C.free(unsafe.Pointer(cip))
defer C.free(unsafe.Pointer(cuser))
defer C.free(unsafe.Pointer(cpass))
defer C.free(unsafe.Pointer(cdb))
taosObj := C.taos_connect(cip, cuser, cpass, cdb, (C.ushort)(port))
if taosObj == nil {
return nil, errors.New("taos_connect() fail!")
}
return (unsafe.Pointer)(taosObj), nil
}
func (mc *taosConn) taosQuery(sqlstr string) (int, error) {
//taosLog.Printf("taosQuery() input sql:%s\n", sqlstr)
csqlstr := C.CString(sqlstr)
defer C.free(unsafe.Pointer(csqlstr))
code := int(C.taos_query(mc.taos, csqlstr))
if 0 != code {
mc.taos_error()
errStr := C.GoString(C.taos_errstr(mc.taos))
taosLog.Println("taos_query() failed:", errStr)
taosLog.Printf("taosQuery() input sql:%s\n", sqlstr)
return 0, errors.New(errStr)
}
// read result and save into mc struct
num_fields := int(C.taos_field_count(mc.taos))
if 0 == num_fields { // there are no select and show kinds of commands
mc.affectedRows = int(C.taos_affected_rows(mc.taos))
mc.insertId = 0
}
return num_fields, nil
}
func (mc *taosConn) taos_close() {
C.taos_close(mc.taos)
}
func (mc *taosConn) taos_error() {
// free local resouce: allocated memory/metric-meta refcnt
//var pRes unsafe.Pointer
pRes := C.taos_use_result(mc.taos)
C.taos_free_result(pRes)
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package taosSql
/*
#cgo CFLAGS : -I/usr/include
#include <stdlib.h>
#cgo LDFLAGS: -L/usr/lib -ltaos
void taosSetAllocMode(int mode, const char* path, _Bool autoDump);
void taosDumpMemoryLeak();
*/
import "C"
import (
"database/sql/driver"
"errors"
"fmt"
"sync/atomic"
"time"
"unsafe"
)
// Returns the bool value of the input.
// The 2nd return value indicates if the input was a valid bool value
func readBool(input string) (value bool, valid bool) {
switch input {
case "1", "true", "TRUE", "True":
return true, true
case "0", "false", "FALSE", "False":
return false, true
}
// Not a valid bool value
return
}
/******************************************************************************
* Time related utils *
******************************************************************************/
// NullTime represents a time.Time that may be NULL.
// NullTime implements the Scanner interface so
// it can be used as a scan destination:
//
// var nt NullTime
// err := db.QueryRow("SELECT time FROM foo WHERE id=?", id).Scan(&nt)
// ...
// if nt.Valid {
// // use nt.Time
// } else {
// // NULL value
// }
//
// This NullTime implementation is not driver-specific
type NullTime struct {
Time time.Time
Valid bool // Valid is true if Time is not NULL
}
// Scan implements the Scanner interface.
// The value type must be time.Time or string / []byte (formatted time-string),
// otherwise Scan fails.
func (nt *NullTime) Scan(value interface{}) (err error) {
if value == nil {
nt.Time, nt.Valid = time.Time{}, false
return
}
switch v := value.(type) {
case time.Time:
nt.Time, nt.Valid = v, true
return
case []byte:
nt.Time, err = parseDateTime(string(v), time.UTC)
nt.Valid = (err == nil)
return
case string:
nt.Time, err = parseDateTime(v, time.UTC)
nt.Valid = (err == nil)
return
}
nt.Valid = false
return fmt.Errorf("Can't convert %T to time.Time", value)
}
// Value implements the driver Valuer interface.
func (nt NullTime) Value() (driver.Value, error) {
if !nt.Valid {
return nil, nil
}
return nt.Time, nil
}
func parseDateTime(str string, loc *time.Location) (t time.Time, err error) {
base := "0000-00-00 00:00:00.0000000"
switch len(str) {
case 10, 19, 21, 22, 23, 24, 25, 26: // up to "YYYY-MM-DD HH:MM:SS.MMMMMM"
if str == base[:len(str)] {
return
}
t, err = time.Parse(timeFormat[:len(str)], str)
default:
err = fmt.Errorf("invalid time string: %s", str)
return
}
// Adjust location
if err == nil && loc != time.UTC {
y, mo, d := t.Date()
h, mi, s := t.Clock()
t, err = time.Date(y, mo, d, h, mi, s, t.Nanosecond(), loc), nil
}
return
}
// zeroDateTime is used in formatBinaryDateTime to avoid an allocation
// if the DATE or DATETIME has the zero value.
// It must never be changed.
// The current behavior depends on database/sql copying the result.
var zeroDateTime = []byte("0000-00-00 00:00:00.000000")
const digits01 = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
const digits10 = "0000000000111111111122222222223333333333444444444455555555556666666666777777777788888888889999999999"
/******************************************************************************
* Convert from and to bytes *
******************************************************************************/
func uint64ToBytes(n uint64) []byte {
return []byte{
byte(n),
byte(n >> 8),
byte(n >> 16),
byte(n >> 24),
byte(n >> 32),
byte(n >> 40),
byte(n >> 48),
byte(n >> 56),
}
}
func uint64ToString(n uint64) []byte {
var a [20]byte
i := 20
// U+0030 = 0
// ...
// U+0039 = 9
var q uint64
for n >= 10 {
i--
q = n / 10
a[i] = uint8(n-q*10) + 0x30
n = q
}
i--
a[i] = uint8(n) + 0x30
return a[i:]
}
// treats string value as unsigned integer representation
func stringToInt(b []byte) int {
val := 0
for i := range b {
val *= 10
val += int(b[i] - 0x30)
}
return val
}
// reserveBuffer checks cap(buf) and expand buffer to len(buf) + appendSize.
// If cap(buf) is not enough, reallocate new buffer.
func reserveBuffer(buf []byte, appendSize int) []byte {
newSize := len(buf) + appendSize
if cap(buf) < newSize {
// Grow buffer exponentially
newBuf := make([]byte, len(buf)*2+appendSize)
copy(newBuf, buf)
buf = newBuf
}
return buf[:newSize]
}
// escapeBytesBackslash escapes []byte with backslashes (\)
// This escapes the contents of a string (provided as []byte) by adding backslashes before special
// characters, and turning others into specific escape sequences, such as
// turning newlines into \n and null bytes into \0.
func escapeBytesBackslash(buf, v []byte) []byte {
pos := len(buf)
buf = reserveBuffer(buf, len(v)*2)
for _, c := range v {
switch c {
case '\x00':
buf[pos] = '\\'
buf[pos+1] = '0'
pos += 2
case '\n':
buf[pos] = '\\'
buf[pos+1] = 'n'
pos += 2
case '\r':
buf[pos] = '\\'
buf[pos+1] = 'r'
pos += 2
case '\x1a':
buf[pos] = '\\'
buf[pos+1] = 'Z'
pos += 2
case '\'':
buf[pos] = '\\'
buf[pos+1] = '\''
pos += 2
case '"':
buf[pos] = '\\'
buf[pos+1] = '"'
pos += 2
case '\\':
buf[pos] = '\\'
buf[pos+1] = '\\'
pos += 2
default:
buf[pos] = c
pos++
}
}
return buf[:pos]
}
// escapeStringBackslash is similar to escapeBytesBackslash but for string.
func escapeStringBackslash(buf []byte, v string) []byte {
pos := len(buf)
buf = reserveBuffer(buf, len(v)*2)
for i := 0; i < len(v); i++ {
c := v[i]
switch c {
case '\x00':
buf[pos] = '\\'
buf[pos+1] = '0'
pos += 2
case '\n':
buf[pos] = '\\'
buf[pos+1] = 'n'
pos += 2
case '\r':
buf[pos] = '\\'
buf[pos+1] = 'r'
pos += 2
case '\x1a':
buf[pos] = '\\'
buf[pos+1] = 'Z'
pos += 2
//case '\'':
// buf[pos] = '\\'
// buf[pos+1] = '\''
// pos += 2
case '"':
buf[pos] = '\\'
buf[pos+1] = '"'
pos += 2
case '\\':
buf[pos] = '\\'
buf[pos+1] = '\\'
pos += 2
default:
buf[pos] = c
pos++
}
}
return buf[:pos]
}
// escapeBytesQuotes escapes apostrophes in []byte by doubling them up.
// This escapes the contents of a string by doubling up any apostrophes that
// it contains. This is used when the NO_BACKSLASH_ESCAPES SQL_MODE is in
// effect on the server.
func escapeBytesQuotes(buf, v []byte) []byte {
pos := len(buf)
buf = reserveBuffer(buf, len(v)*2)
for _, c := range v {
if c == '\'' {
buf[pos] = '\''
buf[pos+1] = '\''
pos += 2
} else {
buf[pos] = c
pos++
}
}
return buf[:pos]
}
// escapeStringQuotes is similar to escapeBytesQuotes but for string.
func escapeStringQuotes(buf []byte, v string) []byte {
pos := len(buf)
buf = reserveBuffer(buf, len(v)*2)
for i := 0; i < len(v); i++ {
c := v[i]
if c == '\'' {
buf[pos] = '\''
buf[pos+1] = '\''
pos += 2
} else {
buf[pos] = c
pos++
}
}
return buf[:pos]
}
/******************************************************************************
* Sync utils *
******************************************************************************/
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://github.com/golang/go/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
// atomicBool is a wrapper around uint32 for usage as a boolean value with
// atomic access.
type atomicBool struct {
_noCopy noCopy
value uint32
}
// IsSet returns whether the current boolean value is true
func (ab *atomicBool) IsSet() bool {
return atomic.LoadUint32(&ab.value) > 0
}
// Set sets the value of the bool regardless of the previous value
func (ab *atomicBool) Set(value bool) {
if value {
atomic.StoreUint32(&ab.value, 1)
} else {
atomic.StoreUint32(&ab.value, 0)
}
}
// TrySet sets the value of the bool and returns whether the value changed
func (ab *atomicBool) TrySet(value bool) bool {
if value {
return atomic.SwapUint32(&ab.value, 1) == 0
}
return atomic.SwapUint32(&ab.value, 0) > 0
}
// atomicError is a wrapper for atomically accessed error values
type atomicError struct {
_noCopy noCopy
value atomic.Value
}
// Set sets the error value regardless of the previous value.
// The value must not be nil
func (ae *atomicError) Set(value error) {
ae.value.Store(value)
}
// Value returns the current error value
func (ae *atomicError) Value() error {
if v := ae.value.Load(); v != nil {
// this will panic if the value doesn't implement the error interface
return v.(error)
}
return nil
}
func namedValueToValue(named []driver.NamedValue) ([]driver.Value, error) {
dargs := make([]driver.Value, len(named))
for n, param := range named {
if len(param.Name) > 0 {
// TODO: support the use of Named Parameters #561
return nil, errors.New("taosSql: driver does not support the use of Named Parameters")
}
dargs[n] = param.Value
}
return dargs, nil
}
/******************************************************************************
* Utils for C memory issues debugging *
******************************************************************************/
func SetAllocMode(mode int32, path string) {
cpath := C.CString(path)
defer C.free(unsafe.Pointer(cpath))
C.taosSetAllocMode(C.int(mode), cpath, false)
}
func DumpMemoryLeak() {
C.taosDumpMemoryLeak()
}
...@@ -41,7 +41,7 @@ typedef struct { ...@@ -41,7 +41,7 @@ typedef struct {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
} SWriteMsg; } SWriteMsg;
typedef struct _wworker_pool { typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker; SWriteWorker *writeWorker;
......
...@@ -15,19 +15,13 @@ ...@@ -15,19 +15,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tscompression.h"
#include "tskiplist.h"
#include "ttime.h" #include "ttime.h"
#include "tutil.h" #include "tutil.h"
#include "qast.h"
#include "qextbuffer.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tscompression.h" #include "tscompression.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "ttime.h"
#include "name.h" #include "name.h"
#include "taccount.h" #include "taccount.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
...@@ -42,6 +36,7 @@ ...@@ -42,6 +36,7 @@
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtUser.h" #include "mgmtUser.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
#include "tcompare.h"
void * tsChildTableSdb; void * tsChildTableSdb;
void * tsSuperTableSdb; void * tsSuperTableSdb;
......
...@@ -160,7 +160,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -160,7 +160,7 @@ typedef struct SQueryRuntimeEnv {
SQueryCostSummary summary; SQueryCostSummary summary;
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
void* pQueryHandle; void* pQueryHandle;
void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
...@@ -172,6 +172,8 @@ typedef struct SQInfo { ...@@ -172,6 +172,8 @@ typedef struct SQInfo {
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
sem_t dataReady; sem_t dataReady;
SArray* pTableIdList; // table id list SArray* pTableIdList; // table id list
void* tsdb;
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx; int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */ int32_t offset; /* offset in group result set of subgroup */
......
...@@ -79,17 +79,10 @@ extern "C" { ...@@ -79,17 +79,10 @@ extern "C" {
#define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF #define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF
#define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF #define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF
#define TSDB_PATTERN_MATCH 0
#define TSDB_PATTERN_NOMATCH 1
#define TSDB_PATTERN_NOWILDCARDMATCH 2
#define TSDB_PATTERN_STRING_MAX_LEN 20
#define TSDB_FUNCTIONS_NAME_MAX_LENGTH 16 #define TSDB_FUNCTIONS_NAME_MAX_LENGTH 16
#define TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE 50 #define TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE 50
#define PATTERN_COMPARE_INFO_INITIALIZER \
{ '%', '_' }
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value #define DATA_SET_FLAG ',' // to denote the output area has data, not null value
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG) #define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
...@@ -222,20 +215,11 @@ typedef struct SQLAggFuncElem { ...@@ -222,20 +215,11 @@ typedef struct SQLAggFuncElem {
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId); int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
} SQLAggFuncElem; } SQLAggFuncElem;
typedef struct SPatternCompareInfo {
char matchAll; // symbol for match all wildcard, default: '%'
char matchOne; // symbol for match one wildcard, default: '_'
} SPatternCompareInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo) #define GET_RES_INFO(ctx) ((ctx)->resultInfo)
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *len, int16_t *interResBytes, int16_t extLength, bool isSuperTable); int16_t *len, int16_t *interResBytes, int16_t extLength, bool isSuperTable);
int patternMatch(const char *zPattern, const char *zString, size_t size, const SPatternCompareInfo *pInfo);
int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo);
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0) #define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0) #define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0) #define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
......
...@@ -362,10 +362,10 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio ...@@ -362,10 +362,10 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio
bool doRevisedResultsByLimit(SQInfo *pQInfo) { bool doRevisedResultsByLimit(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if ((pQuery->limit.limit > 0) && (pQuery->rec.rows + pQuery->rec.rows > pQuery->limit.limit)) { if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) {
pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.rows; pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total;
assert(pQuery->rec.rows > 0);
// query completed
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
return true; return true;
} }
...@@ -1552,6 +1552,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1552,6 +1552,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
destroyResultBuf(pRuntimeEnv->pResultBuf); destroyResultBuf(pRuntimeEnv->pResultBuf);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
} }
...@@ -2501,17 +2503,20 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl ...@@ -2501,17 +2503,20 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
} }
int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1; int32_t midPos = -1;
int numOfPoints; int32_t numOfPoints;
TSKEY *keyList;
if (num <= 0) return -1; if (num <= 0) {
return -1;
}
keyList = (TSKEY *)pValue; assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
firstPos = 0;
lastPos = num - 1; TSKEY* keyList = (TSKEY *)pValue;
int32_t firstPos = 0;
int32_t lastPos = num - 1;
if (order == 0) { if (order == TSDB_ORDER_DESC) {
// find the first position which is smaller than the key // find the first position which is smaller than the key
while (1) { while (1) {
if (key >= keyList[lastPos]) return lastPos; if (key >= keyList[lastPos]) return lastPos;
...@@ -2565,7 +2570,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2565,7 +2570,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle; tsdb_query_handle_t pQueryHandle = pRuntimeEnv->scanFlag == MASTER_SCAN? pRuntimeEnv->pQueryHandle:pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
...@@ -3520,8 +3525,9 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3520,8 +3525,9 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
// store the start query position // store the start query position
void *pos = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); // void *pos = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle);
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
int64_t skey = pQuery->lastKey; int64_t skey = pQuery->lastKey;
int32_t status = pQuery->status; int32_t status = pQuery->status;
int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex; int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex;
...@@ -3543,15 +3549,32 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3543,15 +3549,32 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
} }
// set the correct start position, and load the corresponding block in buffer for next round scan all data blocks. // set the correct start position, and load the corresponding block in buffer for next round scan all data blocks.
/*int32_t ret =*/ tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos); // /*int32_t ret =*/ tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos);
STsdbQueryCond cond = {
.twindow = {pQuery->window.skey, pQuery->lastKey},
.order = pQuery->order.order,
.colList = pQuery->colList,
};
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
taosArrayPush(cols, &pQuery->colList[i]);
}
if (pRuntimeEnv->pSecQueryHandle != NULL) {
pRuntimeEnv->pSecQueryHandle = tsdbQueryByTableId(pQInfo->tsdb, &cond, pQInfo->pTableIdList, cols);
}
taosArrayDestroy(cols);
status = pQuery->status; status = pQuery->status;
pRuntimeEnv->windowResInfo.curIndex = activeSlot; pRuntimeEnv->windowResInfo.curIndex = activeSlot;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN; pRuntimeEnv->scanFlag = REPEAT_SCAN;
/* check if query is killed or not */ // check if query is killed or not
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return; return;
} }
...@@ -4179,6 +4202,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) ...@@ -4179,6 +4202,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
taosArrayDestroy(cols); taosArrayDestroy(cols);
pQInfo->tsdb = tsdb;
pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTSBuf = param; pRuntimeEnv->pTSBuf = param;
...@@ -4972,7 +4996,6 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4972,7 +4996,6 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
while (1) { while (1) {
// initCtxOutputBuf(pRuntimeEnv);
scanAllDataBlocks(pRuntimeEnv); scanAllDataBlocks(pRuntimeEnv);
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <iostream> #include <iostream>
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tcompare.h"
TEST(testCase, patternMatchTest) { TEST(testCase, patternMatchTest) {
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _rpc_server_header_
#define _rpc_server_header_
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosCleanUpTcpServer(void *param);
void taosCloseTcpServerConnection(void *param);
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#ifdef __cplusplus
}
#endif
#endif
...@@ -13,20 +13,22 @@ ...@@ -13,20 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _rpc_client_header_ #ifndef _rpc_tcp_header_
#define _rpc_client_header_ #define _rpc_tcp_header_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "taosdef.h" void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosCleanUpTcpServer(void *param);
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle); void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle);
void taosCleanUpTcpClient(void *chandle); void taosCleanUpTcpClient(void *chandle);
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port); void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port);
void taosCloseTcpClientConnection(void *chandle);
int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,18 +22,18 @@ ...@@ -22,18 +22,18 @@
#include "tutil.h" #include "tutil.h"
#include "rpcCache.h" #include "rpcCache.h"
typedef struct _c_hash_t { typedef struct SConnHash {
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
char connType; char connType;
struct _c_hash_t *prev; struct SConnHash *prev;
struct _c_hash_t *next; struct SConnHash *next;
void * data; void *data;
uint64_t time; uint64_t time;
} SConnHash; } SConnHash;
typedef struct { typedef struct {
SConnHash ** connHashList; SConnHash **connHashList;
mpool_h connHashMemPool; mpool_h connHashMemPool;
int maxSessions; int maxSessions;
int total; int total;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tsocket.h"
#include "tutil.h"
#include "rpcClient.h"
#include "rpcHead.h"
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
typedef struct _tcp_fd {
void *signature;
int fd; // TCP socket FD
void * thandle;
uint32_t ip;
char ipstr[20];
uint16_t port;
struct _tcp_client *pTcp;
struct _tcp_fd * prev, *next;
} STcpFd;
typedef struct _tcp_client {
pthread_t thread;
STcpFd * pHead;
pthread_mutex_t mutex;
pthread_cond_t fdReady;
int pollFd;
int numOfFds;
char label[12];
char ipstr[20];
void *shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pRecv);
} STcpClient;
#define maxTcpEvents 100
static void taosCleanUpTcpFdObj(STcpFd *pFdObj);
static void *taosReadTcpData(void *param);
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
STcpClient *pTcp;
pthread_attr_t thattr;
pTcp = (STcpClient *)malloc(sizeof(STcpClient));
memset(pTcp, 0, sizeof(STcpClient));
strcpy(pTcp->label, label);
strcpy(pTcp->ipstr, ip);
pTcp->shandle = shandle;
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno));
return NULL;
}
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
return NULL;
}
pTcp->pollFd = epoll_create(10); // size does not matter
if (pTcp->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
return NULL;
}
pTcp->processData = fp;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp));
pthread_attr_destroy(&thattr);
if (code != 0) {
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno));
return NULL;
}
tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port);
return pTcp;
}
void taosCleanUpTcpClient(void *chandle) {
STcpClient *pTcp = (STcpClient *)chandle;
if (pTcp == NULL) return;
while (pTcp->pHead) {
taosCleanUpTcpFdObj(pTcp->pHead);
pTcp->pHead = pTcp->pHead->next;
}
close(pTcp->pollFd);
pthread_cancel(pTcp->thread);
pthread_join(pTcp->thread, NULL);
// tTrace (":%s, all connections are cleaned up", pTcp->label);
tfree(pTcp);
}
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
STcpClient * pTcp = (STcpClient *)shandle;
STcpFd * pFdObj;
struct epoll_event event;
struct in_addr destIp;
int fd;
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
if (fd <= 0) return NULL;
pFdObj = (STcpFd *)malloc(sizeof(STcpFd));
if (pFdObj == NULL) {
tError("%s no enough resource to allocate TCP FD IDs", pTcp->label);
tclose(fd);
return NULL;
}
memset(pFdObj, 0, sizeof(STcpFd));
pFdObj->fd = fd;
strcpy(pFdObj->ipstr, ip);
inet_aton(ip, &destIp);
pFdObj->ip = destIp.s_addr;
pFdObj->port = port;
pFdObj->pTcp = pTcp;
pFdObj->thandle = thandle;
pFdObj->signature = pFdObj;
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
event.data.ptr = pFdObj;
if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno));
tfree(pFdObj);
tclose(fd);
return NULL;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pTcp->mutex));
pFdObj->next = pTcp->pHead;
if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj;
pTcp->pHead = pFdObj;
pTcp->numOfFds++;
pthread_cond_signal(&pTcp->fdReady);
pthread_mutex_unlock(&(pTcp->mutex));
tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds);
return pFdObj;
}
void taosCloseTcpClientConnection(void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (pFdObj == NULL) return;
taosCleanUpTcpFdObj(pFdObj);
}
int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
STcpFd *pFdObj = (STcpFd *)chandle;
if (chandle == NULL) return -1;
return (int)send(pFdObj->fd, data, (size_t)len, 0);
}
static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
STcpClient *pTcp;
SRecvInfo recvInfo;
if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return;
pTcp = pFdObj->pTcp;
if (pTcp == NULL) {
tError("double free TcpFdObj!!!!");
return;
}
epoll_ctl(pTcp->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
close(pFdObj->fd);
pthread_mutex_lock(&pTcp->mutex);
pTcp->numOfFds--;
if (pTcp->numOfFds < 0)
tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj);
if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next;
} else {
pTcp->pHead = pFdObj->next;
}
if (pFdObj->next) {
(pFdObj->next)->prev = pFdObj->prev;
}
pthread_mutex_unlock(&pTcp->mutex);
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
if (pFdObj->thandle) (*(pTcp->processData))(&recvInfo);
tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds);
memset(pFdObj, 0, sizeof(STcpFd));
tfree(pFdObj);
}
static void *taosReadTcpData(void *param) {
STcpClient *pTcp = (STcpClient *)param;
int i, fdNum;
STcpFd *pFdObj;
struct epoll_event events[maxTcpEvents];
SRecvInfo recvInfo;
SRpcHead rpcHead;
while (1) {
pthread_mutex_lock(&pTcp->mutex);
if (pTcp->numOfFds < 1) pthread_cond_wait(&pTcp->fdReady, &pTcp->mutex);
pthread_mutex_unlock(&pTcp->mutex);
fdNum = epoll_wait(pTcp->pollFd, events, maxTcpEvents, -1);
if (fdNum < 0) continue;
for (i = 0; i < fdNum; ++i) {
pFdObj = events[i].data.ptr;
if (events[i].events & EPOLLERR) {
tTrace("%s TCP error happened on FD\n", pTcp->label);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
if (events[i].events & EPOLLHUP) {
tTrace("%s TCP FD hang up\n", pTcp->label);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tError("%s read error, headLen:%d", pTcp->label, headLen);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead);
if (NULL == buffer) {
tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, msgLen);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
char *msg = buffer + tsRpcOverhead;
int32_t leftLen = msgLen - headLen;
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s read error, leftLen:%d retLen:%d", pTcp->label, leftLen, retLen);
tfree(buffer);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
// tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen);
memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg;
recvInfo.msgLen = msgLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = pFdObj;
recvInfo.connType = RPC_CONN_TCP;
pFdObj->thandle = (*(pTcp->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosCleanUpTcpFdObj(pFdObj);
}
}
return NULL;
}
...@@ -17,13 +17,13 @@ ...@@ -17,13 +17,13 @@
#include "tlog.h" #include "tlog.h"
#include "tmempool.h" #include "tmempool.h"
typedef struct _ip_hash_t { typedef struct SIpHash {
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int hash; int hash;
struct _ip_hash_t *prev; struct SIpHash *prev;
struct _ip_hash_t *next; struct SIpHash *next;
void * data; void *data;
} SIpHash; } SIpHash;
typedef struct { typedef struct {
...@@ -47,7 +47,7 @@ int rpcHashIp(void *handle, uint32_t ip, uint16_t port) { ...@@ -47,7 +47,7 @@ int rpcHashIp(void *handle, uint32_t ip, uint16_t port) {
void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
int hash; int hash;
SIpHash * pNode; SIpHash *pNode;
SHashObj *pObj; SHashObj *pObj;
pObj = (SHashObj *)handle; pObj = (SHashObj *)handle;
...@@ -70,7 +70,7 @@ void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { ...@@ -70,7 +70,7 @@ void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
int hash; int hash;
SIpHash * pNode; SIpHash *pNode;
SHashObj *pObj; SHashObj *pObj;
pObj = (SHashObj *)handle; pObj = (SHashObj *)handle;
...@@ -102,7 +102,7 @@ void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { ...@@ -102,7 +102,7 @@ void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port) { void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port) {
int hash; int hash;
SIpHash * pNode; SIpHash *pNode;
SHashObj *pObj; SHashObj *pObj;
pObj = (SHashObj *)handle; pObj = (SHashObj *)handle;
......
...@@ -27,8 +27,7 @@ ...@@ -27,8 +27,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "rpcUdp.h" #include "rpcUdp.h"
#include "rpcCache.h" #include "rpcCache.h"
#include "rpcClient.h" #include "rpcTcp.h"
#include "rpcServer.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "trpc.h" #include "trpc.h"
#include "hash.h" #include "hash.h"
...@@ -67,7 +66,7 @@ typedef struct { ...@@ -67,7 +66,7 @@ typedef struct {
void *udphandle;// returned handle from UDP initialization void *udphandle;// returned handle from UDP initialization
void *pCache; // connection cache void *pCache; // connection cache
pthread_mutex_t mutex; pthread_mutex_t mutex;
struct _RpcConn *connList; // connection list struct SRpcConn *connList; // connection list
} SRpcInfo; } SRpcInfo;
typedef struct { typedef struct {
...@@ -88,7 +87,7 @@ typedef struct { ...@@ -88,7 +87,7 @@ typedef struct {
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
typedef struct _RpcConn { typedef struct SRpcConn {
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID uint32_t peerId; // peer link ID
...@@ -156,8 +155,8 @@ void (*taosCleanUpConn[])(void *thandle) = { ...@@ -156,8 +155,8 @@ void (*taosCleanUpConn[])(void *thandle) = {
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
taosSendUdpData, taosSendUdpData,
taosSendUdpData, taosSendUdpData,
taosSendTcpServerData, taosSendTcpData,
taosSendTcpClientData taosSendTcpData
}; };
void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = { void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = {
...@@ -170,8 +169,8 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = ...@@ -170,8 +169,8 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) =
void (*taosCloseConn[])(void *chandle) = { void (*taosCloseConn[])(void *chandle) = {
NULL, NULL,
NULL, NULL,
taosCloseTcpServerConnection, taosCloseTcpConnection,
taosCloseTcpClientConnection taosCloseTcpConnection
}; };
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType); static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType);
...@@ -817,7 +816,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -817,7 +816,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
tTrace("%s %p, link is broken", pRpc->label, pConn); tTrace("%s %p, link is broken", pRpc->label, pConn);
pConn->chandle = NULL; // pConn->chandle = NULL;
if (pConn->outType) { if (pConn->outType) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
int tsUdpDelay = 0; int tsUdpDelay = 0;
typedef struct { typedef struct {
void * signature; void *signature;
int index; int index;
int fd; int fd;
uint16_t port; // peer port uint16_t port; // peer port
...@@ -53,23 +53,23 @@ typedef struct { ...@@ -53,23 +53,23 @@ typedef struct {
int server; int server;
char ip[16]; // local IP char ip[16]; // local IP
uint16_t port; // local Port uint16_t port; // local Port
void * shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
int threads; int threads;
char label[12]; char label[12];
void * tmrCtrl; void *tmrCtrl;
void *(*fp)(SRecvInfo *pPacket); void *(*fp)(SRecvInfo *pPacket);
SUdpConn udpConn[]; SUdpConn udpConn[];
} SUdpConnSet; } SUdpConnSet;
typedef struct { typedef struct {
void * signature; void *signature;
uint32_t ip; // dest IP uint32_t ip; // dest IP
uint16_t port; // dest Port uint16_t port; // dest Port
SUdpConn * pConn; SUdpConn *pConn;
struct sockaddr_in destAdd; struct sockaddr_in destAdd;
void * msgHdr; void *msgHdr;
int totalLen; int totalLen;
void * timer; void *timer;
int emptyNum; int emptyNum;
} SUdpBuf; } SUdpBuf;
...@@ -78,9 +78,8 @@ static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port); ...@@ -78,9 +78,8 @@ static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port);
static void taosProcessUdpBufTimer(void *param, void *tmrId); static void taosProcessUdpBufTimer(void *param, void *tmrId);
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
pthread_attr_t thAttr; SUdpConn *pConn;
SUdpConn * pConn; SUdpConnSet *pSet;
SUdpConnSet * pSet;
int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn); int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
pSet = (SUdpConnSet *)malloc((size_t)size); pSet = (SUdpConnSet *)malloc((size_t)size);
...@@ -106,9 +105,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -106,9 +105,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
} }
} }
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
uint16_t ownPort; uint16_t ownPort;
for (int i = 0; i < threads; ++i) { for (int i = 0; i < threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
...@@ -146,19 +142,21 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -146,19 +142,21 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pConn->tmrCtrl = pSet->tmrCtrl; pConn->tmrCtrl = pSet->tmrCtrl;
} }
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
pthread_attr_destroy(&thAttr);
if (code != 0) { if (code != 0) {
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
taosCloseSocket(pConn->fd); taosCloseSocket(pConn->fd);
taosCleanUpUdpConnection(pSet); taosCleanUpUdpConnection(pSet);
pthread_attr_destroy(&thAttr);
return NULL; return NULL;
} }
++pSet->threads; ++pSet->threads;
} }
pthread_attr_destroy(&thAttr);
tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads); tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads);
return pSet; return pSet;
...@@ -166,7 +164,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -166,7 +164,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
void taosCleanUpUdpConnection(void *handle) { void taosCleanUpUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle; SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn * pConn; SUdpConn *pConn;
if (pSet == NULL) return; if (pSet == NULL) return;
...@@ -207,10 +205,10 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por ...@@ -207,10 +205,10 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por
} }
static void *taosRecvUdpData(void *param) { static void *taosRecvUdpData(void *param) {
SUdpConn *pConn = param;
struct sockaddr_in sourceAdd; struct sockaddr_in sourceAdd;
int dataLen; int dataLen;
unsigned int addLen; unsigned int addLen;
SUdpConn * pConn = (SUdpConn *)param;
uint16_t port; uint16_t port;
int minSize = sizeof(SRpcHead); int minSize = sizeof(SRpcHead);
SRecvInfo recvInfo; SRecvInfo recvInfo;
...@@ -276,7 +274,7 @@ static void *taosRecvUdpData(void *param) { ...@@ -276,7 +274,7 @@ static void *taosRecvUdpData(void *param) {
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) { int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
SUdpConn *pConn = (SUdpConn *)chandle; SUdpConn *pConn = (SUdpConn *)chandle;
SUdpBuf * pBuf; SUdpBuf *pBuf;
if (pConn == NULL || pConn->signature != pConn) return -1; if (pConn == NULL || pConn->signature != pConn) return -1;
......
...@@ -204,6 +204,8 @@ int main(int argc, char *argv[]) { ...@@ -204,6 +204,8 @@ int main(int argc, char *argv[]) {
tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
getchar();
taosCloseLogger(); taosCloseLogger();
return 0; return 0;
......
...@@ -16,8 +16,24 @@ ...@@ -16,8 +16,24 @@
#ifndef TDENGINE_TCOMPARE_H #ifndef TDENGINE_TCOMPARE_H
#define TDENGINE_TCOMPARE_H #define TDENGINE_TCOMPARE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h" #include "os.h"
#define TSDB_PATTERN_MATCH 0
#define TSDB_PATTERN_NOMATCH 1
#define TSDB_PATTERN_NOWILDCARDMATCH 2
#define TSDB_PATTERN_STRING_MAX_LEN 20
#define PATTERN_COMPARE_INFO_INITIALIZER { '%', '_' }
typedef struct SPatternCompareInfo {
char matchAll; // symbol for match all wildcard, default: '%'
char matchOne; // symbol for match one wildcard, default: '_'
} SPatternCompareInfo;
int32_t compareInt32Val(const void *pLeft, const void *pRight); int32_t compareInt32Val(const void *pLeft, const void *pRight);
int32_t compareInt64Val(const void *pLeft, const void *pRight); int32_t compareInt64Val(const void *pLeft, const void *pRight);
...@@ -36,8 +52,16 @@ int32_t compareStrVal(const void *pLeft, const void *pRight); ...@@ -36,8 +52,16 @@ int32_t compareStrVal(const void *pLeft, const void *pRight);
int32_t compareWStrVal(const void *pLeft, const void *pRight); int32_t compareWStrVal(const void *pLeft, const void *pRight);
int patternMatch(const char *zPattern, const char *zString, size_t size, const SPatternCompareInfo *pInfo);
int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo);
__compar_fn_t getKeyComparFunc(int32_t keyType); __compar_fn_t getKeyComparFunc(int32_t keyType);
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType); __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TCOMPARE_H #endif // TDENGINE_TCOMPARE_H
#include "taosdef.h" #include "taosdef.h"
#include "tcompare.h" #include "tcompare.h"
#include "tutil.h"
int32_t compareInt32Val(const void *pLeft, const void *pRight) { int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT32_VAL(pLeft) - GET_INT32_VAL(pRight); int32_t ret = GET_INT32_VAL(pLeft) - GET_INT32_VAL(pRight);
if (ret == 0) { if (ret == 0) {
...@@ -11,7 +13,7 @@ int32_t compareInt32Val(const void *pLeft, const void *pRight) { ...@@ -11,7 +13,7 @@ int32_t compareInt32Val(const void *pLeft, const void *pRight) {
} }
int32_t compareInt64Val(const void *pLeft, const void *pRight) { int32_t compareInt64Val(const void *pLeft, const void *pRight) {
int32_t ret = GET_INT64_VAL(pLeft) - GET_INT64_VAL(pRight); int64_t ret = GET_INT64_VAL(pLeft) - GET_INT64_VAL(pRight);
if (ret == 0) { if (ret == 0) {
return 0; return 0;
} else { } else {
...@@ -102,6 +104,143 @@ int32_t compareWStrVal(const void *pLeft, const void *pRight) { ...@@ -102,6 +104,143 @@ int32_t compareWStrVal(const void *pLeft, const void *pRight) {
return 0; return 0;
} }
/*
* Compare two strings
* TSDB_MATCH: Match
* TSDB_NOMATCH: No match
* TSDB_NOWILDCARDMATCH: No match in spite of having * or % wildcards.
* Like matching rules:
* '%': Matches zero or more characters
* '_': Matches one character
*
*/
int patternMatch(const char *patterStr, const char *str, size_t size, const SPatternCompareInfo *pInfo) {
char c, c1;
int32_t i = 0;
int32_t j = 0;
while ((c = patterStr[i++]) != 0) {
if (c == pInfo->matchAll) { /* Match "*" */
while ((c = patterStr[i++]) == pInfo->matchAll || c == pInfo->matchOne) {
if (c == pInfo->matchOne && (j > size || str[j++] == 0)) {
// empty string, return not match
return TSDB_PATTERN_NOWILDCARDMATCH;
}
}
if (c == 0) {
return TSDB_PATTERN_MATCH; /* "*" at the end of the pattern matches */
}
char next[3] = {toupper(c), tolower(c), 0};
while (1) {
size_t n = strcspn(str, next);
str += n;
if (str[0] == 0 || (n >= size - 1)) {
break;
}
int32_t ret = patternMatch(&patterStr[i], ++str, size - n - 1, pInfo);
if (ret != TSDB_PATTERN_NOMATCH) {
return ret;
}
}
return TSDB_PATTERN_NOWILDCARDMATCH;
}
c1 = str[j++];
if (j <= size) {
if (c == c1 || tolower(c) == tolower(c1) || (c == pInfo->matchOne && c1 != 0)) {
continue;
}
}
return TSDB_PATTERN_NOMATCH;
}
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
}
int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, const SPatternCompareInfo *pInfo) {
wchar_t c, c1;
wchar_t matchOne = L'_'; // "_"
wchar_t matchAll = L'%'; // "%"
int32_t i = 0;
int32_t j = 0;
while ((c = patterStr[i++]) != 0) {
if (c == matchAll) { /* Match "%" */
while ((c = patterStr[i++]) == matchAll || c == matchOne) {
if (c == matchOne && (j > size || str[j++] == 0)) {
return TSDB_PATTERN_NOWILDCARDMATCH;
}
}
if (c == 0) {
return TSDB_PATTERN_MATCH;
}
wchar_t accept[3] = {towupper(c), towlower(c), 0};
while (1) {
size_t n = wcsspn(str, accept);
str += n;
if (str[0] == 0 || (n >= size - 1)) {
break;
}
str++;
int32_t ret = WCSPatternMatch(&patterStr[i], str, wcslen(str), pInfo);
if (ret != TSDB_PATTERN_NOMATCH) {
return ret;
}
}
return TSDB_PATTERN_NOWILDCARDMATCH;
}
c1 = str[j++];
if (j <= size) {
if (c == c1 || towlower(c) == towlower(c1) || (c == matchOne && c1 != 0)) {
continue;
}
}
return TSDB_PATTERN_NOMATCH;
}
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
}
static UNUSED_FUNC int32_t compareStrPatternComp(const void* pLeft, const void* pRight) {
SPatternCompareInfo pInfo = {'%', '_'};
const char* pattern = pRight;
const char* str = pLeft;
int32_t ret = patternMatch(pattern, str, strlen(str), &pInfo);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
}
static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
SPatternCompareInfo pInfo = {'%', '_'};
const wchar_t* pattern = pRight;
const wchar_t* str = pLeft;
int32_t ret = WCSPatternMatch(pattern, str, wcslen(str), &pInfo);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
}
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) { __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
__compar_fn_t comparFn = NULL; __compar_fn_t comparFn = NULL;
...@@ -109,8 +248,9 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) { ...@@ -109,8 +248,9 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT:
if (filterDataType == TSDB_DATA_TYPE_BIGINT) { case TSDB_DATA_TYPE_TIMESTAMP: {
if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) {
comparFn = compareInt64Val; comparFn = compareInt64Val;
break; break;
} }
......
...@@ -75,53 +75,6 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk ...@@ -75,53 +75,6 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk
static SSkipListNode* tSkipListDoAppend(SSkipList *pSkipList, SSkipListNode *pNode); static SSkipListNode* tSkipListDoAppend(SSkipList *pSkipList, SSkipListNode *pNode);
static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
//static __compar_fn_t getComparFunc(SSkipList *pSkipList, int32_t filterDataType) {
// __compar_fn_t comparFn = NULL;
//
// switch (pSkipList->keyInfo.type) {
// case TSDB_DATA_TYPE_TINYINT:
// case TSDB_DATA_TYPE_SMALLINT:
// case TSDB_DATA_TYPE_INT:
// case TSDB_DATA_TYPE_BIGINT: {
// if (filterDataType == TSDB_DATA_TYPE_BIGINT) {
// comparFn = compareInt64Val;
// break;
// }
// }
// case TSDB_DATA_TYPE_BOOL: {
// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
// comparFn = compareInt32Val;
// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
// comparFn = compareIntDoubleVal;
// }
// break;
// }
// case TSDB_DATA_TYPE_FLOAT:
// case TSDB_DATA_TYPE_DOUBLE: {
//// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
//// comparFn = compareDoubleIntVal;
//// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
//// comparFn = compareDoubleVal;
//// }
// if (filterDataType == TSDB_DATA_TYPE_DOUBLE) {
// comparFn = compareDoubleVal;
// }
// break;
// }
// case TSDB_DATA_TYPE_BINARY:
// comparFn = compareStrVal;
// break;
// case TSDB_DATA_TYPE_NCHAR:
// comparFn = compareWStrVal;
// break;
// default:
// comparFn = compareInt32Val;
// break;
// }
//
// return comparFn;
//}
static bool initForwardBackwardPtr(SSkipList* pSkipList) { static bool initForwardBackwardPtr(SSkipList* pSkipList) {
uint32_t maxLevel = pSkipList->maxLevel; uint32_t maxLevel = pSkipList->maxLevel;
...@@ -445,6 +398,11 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* ...@@ -445,6 +398,11 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
iter->cur = forward[0]; // greater equals than the value iter->cur = forward[0]; // greater equals than the value
} else { } else {
iter->cur = SL_GET_FORWARD_POINTER(forward[0], 0); iter->cur = SL_GET_FORWARD_POINTER(forward[0], 0);
if (ret == 0) {
assert(iter->cur != pSkipList->pTail);
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
}
} }
return iter; return iter;
......
...@@ -813,6 +813,13 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) ...@@ -813,6 +813,13 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables)
return iters; return iters;
} }
static void tsdbFreeMemTable(SMemTable *pMemTable) {
if (pMemTable) {
tSkipListDestroy(pMemTable->pData);
free(pMemTable);
}
}
// Commit to file // Commit to file
static void *tsdbCommitData(void *arg) { static void *tsdbCommitData(void *arg) {
// TODO // TODO
...@@ -859,7 +866,8 @@ static void *tsdbCommitData(void *arg) { ...@@ -859,7 +866,8 @@ static void *tsdbCommitData(void *arg) {
// TODO: free the skiplist // TODO: free the skiplist
for (int i = 0; i < pCfg->maxTables; i++) { for (int i = 0; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable && pTable->imem) { // Here has memory leak if (pTable && pTable->imem) {
tsdbFreeMemTable(pTable->imem);
pTable->imem = NULL; pTable->imem = NULL;
} }
} }
......
...@@ -34,7 +34,7 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) { ...@@ -34,7 +34,7 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) {
*contLen = tsdbEstimateTableEncodeSize(pTable); *contLen = tsdbEstimateTableEncodeSize(pTable);
if (*contLen < 0) return NULL; if (*contLen < 0) return NULL;
void *ret = malloc(*contLen); void *ret = calloc(1, *contLen);
if (ret == NULL) return NULL; if (ret == NULL) return NULL;
void *ptr = ret; void *ptr = ret;
......
此差异已折叠。
sudo python2 ./test.py -f insert/basic.py sudo python3 ./test.py -f insert/basic.py
...@@ -73,7 +73,6 @@ if __name__ == "__main__": ...@@ -73,7 +73,6 @@ if __name__ == "__main__":
if fileName == "all": if fileName == "all":
tdCases.runAllLinux(conn) tdCases.runAllLinux(conn)
else: else:
tdLog.info("CBD LN78: %s" % (fileName))
tdCases.runOneLinux(conn, fileName) tdCases.runOneLinux(conn, fileName)
conn.close() conn.close()
else: else:
......
此差异已折叠。
...@@ -10,4 +10,6 @@ run general/db/basic3.sim ...@@ -10,4 +10,6 @@ run general/db/basic3.sim
run general/db/basic4.sim run general/db/basic4.sim
run general/db/basic5.sim run general/db/basic5.sim
run general/user/basic1.sim
################################## ##################################
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册