提交 05229b66 编写于 作者: M Minghao Li

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG c9cc20f
GIT_TAG 8a5e336
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
using System;
using System.Collections.Generic;
using TDengineDriver;
using TDengineDriver.Impl;
using System.Runtime.InteropServices;
namespace TDengineExample
......@@ -19,8 +22,8 @@ namespace TDengineExample
{
if (code == 0 && taosRes != IntPtr.Zero)
{
FetchRowAsyncCallback fetchRowAsyncCallback = new FetchRowAsyncCallback(FetchRowCallback);
TDengine.FetchRowAsync(taosRes, fetchRowAsyncCallback, param);
FetchRawBlockAsyncCallback fetchRowAsyncCallback = new FetchRawBlockAsyncCallback(FetchRawBlockCallback);
TDengine.FetchRawBlockAsync(taosRes, fetchRowAsyncCallback, param);
}
else
{
......@@ -28,179 +31,44 @@ namespace TDengineExample
}
}
static void FetchRowCallback(IntPtr param, IntPtr taosRes, int numOfRows)
// Iteratively call this interface until "numOfRows" is no greater than 0.
static void FetchRawBlockCallback(IntPtr param, IntPtr taosRes, int numOfRows)
{
if (numOfRows > 0)
{
Console.WriteLine($"{numOfRows} rows async retrieved");
DisplayRes(taosRes);
TDengine.FetchRowAsync(taosRes, FetchRowCallback, param);
IntPtr pdata = TDengine.GetRawBlock(taosRes);
List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
List<object> dataList = LibTaos.ReadRawBlock(pdata, metaList, numOfRows);
for (int i = 0; i < dataList.Count; i++)
{
if (i != 0 && (i+1) % metaList.Count == 0)
{
Console.WriteLine("{0}\t|", dataList[i]);
}
else
{
Console.Write("{0}\t|", dataList[i]);
}
}
Console.WriteLine("");
TDengine.FetchRawBlockAsync(taosRes, FetchRawBlockCallback, param);
}
else
{
if (numOfRows == 0)
{
Console.WriteLine("async retrieve complete.");
}
else
{
Console.WriteLine($"FetchRowAsync callback error, error code {numOfRows}");
Console.WriteLine($"FetchRawBlockCallback callback error, error code {numOfRows}");
}
TDengine.FreeResult(taosRes);
}
}
public static void DisplayRes(IntPtr res)
{
if (!IsValidResult(res))
{
TDengine.Cleanup();
System.Environment.Exit(1);
}
List<TDengineMeta> metaList = TDengine.FetchFields(res);
int fieldCount = metaList.Count;
// metaList.ForEach((item) => { Console.Write("{0} ({1}) \t|\t", item.name, item.size); });
List<object> dataList = QueryRes(res, metaList);
for (int index = 0; index < dataList.Count; index++)
{
if (index % fieldCount == 0 && index != 0)
{
Console.WriteLine("");
}
Console.Write("{0} \t|\t", dataList[index].ToString());
}
Console.WriteLine("");
}
public static bool IsValidResult(IntPtr res)
{
if ((res == IntPtr.Zero) || (TDengine.ErrorNo(res) != 0))
{
if (res != IntPtr.Zero)
{
Console.Write("reason: " + TDengine.Error(res));
return false;
}
Console.WriteLine("");
return false;
}
return true;
}
private static List<object> QueryRes(IntPtr res, List<TDengineMeta> meta)
{
IntPtr taosRow;
List<object> dataRaw = new();
while ((taosRow = TDengine.FetchRows(res)) != IntPtr.Zero)
{
dataRaw.AddRange(FetchRow(taosRow, res));
}
if (TDengine.ErrorNo(res) != 0)
{
Console.Write("Query is not complete, Error {0} {1}", TDengine.ErrorNo(res), TDengine.Error(res));
}
TDengine.FreeResult(res);
Console.WriteLine("");
return dataRaw;
}
public static List<object> FetchRow(IntPtr taosRow, IntPtr taosRes)//, List<TDengineMeta> metaList, int numOfFiled
{
List<TDengineMeta> metaList = TDengine.FetchFields(taosRes);
int numOfFiled = TDengine.FieldCount(taosRes);
List<object> dataRaw = new();
IntPtr colLengthPrt = TDengine.FetchLengths(taosRes);
int[] colLengthArr = new int[numOfFiled];
Marshal.Copy(colLengthPrt, colLengthArr, 0, numOfFiled);
for (int i = 0; i < numOfFiled; i++)
{
TDengineMeta meta = metaList[i];
IntPtr data = Marshal.ReadIntPtr(taosRow, IntPtr.Size * i);
if (data == IntPtr.Zero)
{
dataRaw.Add("NULL");
continue;
}
switch ((TDengineDataType)meta.type)
{
case TDengineDataType.TSDB_DATA_TYPE_BOOL:
bool v1 = Marshal.ReadByte(data) != 0;
dataRaw.Add(v1);
break;
case TDengineDataType.TSDB_DATA_TYPE_TINYINT:
sbyte v2 = (sbyte)Marshal.ReadByte(data);
dataRaw.Add(v2);
break;
case TDengineDataType.TSDB_DATA_TYPE_SMALLINT:
short v3 = Marshal.ReadInt16(data);
dataRaw.Add(v3);
break;
case TDengineDataType.TSDB_DATA_TYPE_INT:
int v4 = Marshal.ReadInt32(data);
dataRaw.Add(v4);
break;
case TDengineDataType.TSDB_DATA_TYPE_BIGINT:
long v5 = Marshal.ReadInt64(data);
dataRaw.Add(v5);
break;
case TDengineDataType.TSDB_DATA_TYPE_FLOAT:
float v6 = (float)Marshal.PtrToStructure(data, typeof(float));
dataRaw.Add(v6);
break;
case TDengineDataType.TSDB_DATA_TYPE_DOUBLE:
double v7 = (double)Marshal.PtrToStructure(data, typeof(double));
dataRaw.Add(v7);
break;
case TDengineDataType.TSDB_DATA_TYPE_BINARY:
string v8 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
dataRaw.Add(v8);
break;
case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP:
long v9 = Marshal.ReadInt64(data);
dataRaw.Add(v9);
break;
case TDengineDataType.TSDB_DATA_TYPE_NCHAR:
string v10 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
dataRaw.Add(v10);
break;
case TDengineDataType.TSDB_DATA_TYPE_UTINYINT:
byte v12 = Marshal.ReadByte(data);
dataRaw.Add(v12.ToString());
break;
case TDengineDataType.TSDB_DATA_TYPE_USMALLINT:
ushort v13 = (ushort)Marshal.ReadInt16(data);
dataRaw.Add(v13);
break;
case TDengineDataType.TSDB_DATA_TYPE_UINT:
uint v14 = (uint)Marshal.ReadInt32(data);
dataRaw.Add(v14);
break;
case TDengineDataType.TSDB_DATA_TYPE_UBIGINT:
ulong v15 = (ulong)Marshal.ReadInt64(data);
dataRaw.Add(v15);
break;
case TDengineDataType.TSDB_DATA_TYPE_JSONTAG:
string v16 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
dataRaw.Add(v16);
break;
default:
dataRaw.Add("nonsupport data type");
break;
}
}
return dataRaw;
}
static IntPtr GetConnection()
{
string host = "localhost";
......@@ -223,16 +91,16 @@ namespace TDengineExample
}
}
//output:
// Connect to TDengine success
// 8 rows async retrieved
// 1538548685500 | 11.8 | 221 | 0.28 | california.losangeles | 2 |
// 1538548696600 | 13.4 | 223 | 0.29 | california.losangeles | 2 |
// 1538548685000 | 10.8 | 223 | 0.29 | california.losangeles | 3 |
// 1538548686500 | 11.5 | 221 | 0.35 | california.losangeles | 3 |
// 1538548685000 | 10.3 | 219 | 0.31 | california.sanfrancisco | 2 |
// 1538548695000 | 12.6 | 218 | 0.33 | california.sanfrancisco | 2 |
// 1538548696800 | 12.3 | 221 | 0.31 | california.sanfrancisco | 2 |
// 1538548696650 | 10.3 | 218 | 0.25 | california.sanfrancisco | 3 |
// async retrieve complete.
\ No newline at end of file
// //output:
// // Connect to TDengine success
// // 8 rows async retrieved
// // 1538548685500 | 11.8 | 221 | 0.28 | california.losangeles | 2 |
// // 1538548696600 | 13.4 | 223 | 0.29 | california.losangeles | 2 |
// // 1538548685000 | 10.8 | 223 | 0.29 | california.losangeles | 3 |
// // 1538548686500 | 11.5 | 221 | 0.35 | california.losangeles | 3 |
// // 1538548685000 | 10.3 | 219 | 0.31 | california.sanfrancisco | 2 |
// // 1538548695000 | 12.6 | 218 | 0.33 | california.sanfrancisco | 2 |
// // 1538548696800 | 12.3 | 221 | 0.31 | california.sanfrancisco | 2 |
// // 1538548696650 | 10.3 | 218 | 0.25 | california.sanfrancisco | 3 |
// // async retrieve complete.
\ No newline at end of file
using TDengineDriver;
using TDengineDriver.Impl;
using System.Runtime.InteropServices;
namespace TDengineExample
......@@ -23,7 +24,7 @@ namespace TDengineExample
Console.WriteLine("fieldCount=" + fieldCount);
// print column names
List<TDengineMeta> metas = TDengine.FetchFields(res);
List<TDengineMeta> metas = LibTaos.GetMeta(res);
for (int i = 0; i < metas.Count; i++)
{
Console.Write(metas[i].name + "\t");
......@@ -31,98 +32,17 @@ namespace TDengineExample
Console.WriteLine();
// print values
IntPtr row;
while ((row = TDengine.FetchRows(res)) != IntPtr.Zero)
List<Object> resData = LibTaos.GetData(res);
for (int i = 0; i < resData.Count; i++)
{
List<TDengineMeta> metaList = TDengine.FetchFields(res);
int numOfFiled = TDengine.FieldCount(res);
List<String> dataRaw = new List<string>();
IntPtr colLengthPrt = TDengine.FetchLengths(res);
int[] colLengthArr = new int[numOfFiled];
Marshal.Copy(colLengthPrt, colLengthArr, 0, numOfFiled);
for (int i = 0; i < numOfFiled; i++)
Console.Write($"|{resData[i].ToString()} \t");
if (((i + 1) % metas.Count == 0))
{
TDengineMeta meta = metaList[i];
IntPtr data = Marshal.ReadIntPtr(row, IntPtr.Size * i);
if (data == IntPtr.Zero)
{
Console.Write("NULL\t");
continue;
}
switch ((TDengineDataType)meta.type)
{
case TDengineDataType.TSDB_DATA_TYPE_BOOL:
bool v1 = Marshal.ReadByte(data) == 0 ? false : true;
Console.Write(v1.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_TINYINT:
sbyte v2 = (sbyte)Marshal.ReadByte(data);
Console.Write(v2.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_SMALLINT:
short v3 = Marshal.ReadInt16(data);
Console.Write(v3.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_INT:
int v4 = Marshal.ReadInt32(data);
Console.Write(v4.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_BIGINT:
long v5 = Marshal.ReadInt64(data);
Console.Write(v5.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_FLOAT:
float v6 = (float)Marshal.PtrToStructure(data, typeof(float));
Console.Write(v6.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_DOUBLE:
double v7 = (double)Marshal.PtrToStructure(data, typeof(double));
Console.Write(v7.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_BINARY:
string v8 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
Console.Write(v8 + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP:
long v9 = Marshal.ReadInt64(data);
Console.Write(v9.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_NCHAR:
string v10 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
Console.Write(v10 + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_UTINYINT:
byte v12 = Marshal.ReadByte(data);
Console.Write(v12.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_USMALLINT:
ushort v13 = (ushort)Marshal.ReadInt16(data);
Console.Write(v13.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_UINT:
uint v14 = (uint)Marshal.ReadInt32(data);
Console.Write(v14.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_UBIGINT:
ulong v15 = (ulong)Marshal.ReadInt64(data);
Console.Write(v15.ToString() + "\t");
break;
case TDengineDataType.TSDB_DATA_TYPE_JSONTAG:
string v16 = Marshal.PtrToStringUTF8(data, colLengthArr[i]);
Console.Write(v16 + "\t");
break;
default:
Console.Write("nonsupport data type value");
break;
}
Console.WriteLine("");
}
Console.WriteLine();
}
Console.WriteLine();
if (TDengine.ErrorNo(res) != 0)
{
Console.WriteLine($"Query is not complete, Error {TDengine.ErrorNo(res)} {TDengine.Error(res)}");
......
......@@ -15,10 +15,10 @@ namespace TDengineExample
CheckRes(conn, res, "failed to change database");
res = TDengine.Query(conn, "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
CheckRes(conn, res, "failed to create stable");
var sql = "INSERT INTO d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
"d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
"d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
"d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
var sql = "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
"d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
"d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000)('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
"d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000)('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
res = TDengine.Query(conn, sql);
CheckRes(conn, res, "failed to insert data");
int affectedRows = TDengine.AffectRows(res);
......
......@@ -21,7 +21,7 @@ namespace TDengineExample
CheckStmtRes(res, "failed to prepare stmt");
// 2. bind table name and tags
TAOS_BIND[] tags = new TAOS_BIND[2] { TaosBind.BindBinary("California.SanFrancisco"), TaosBind.BindInt(2) };
TAOS_MULTI_BIND[] tags = new TAOS_MULTI_BIND[2] { TaosMultiBind.MultiBindBinary(new string[]{"California.SanFrancisco"}), TaosMultiBind.MultiBindInt(new int?[] {2}) };
res = TDengine.StmtSetTbnameTags(stmt, "d1001", tags);
CheckStmtRes(res, "failed to bind table name and tags");
......@@ -44,7 +44,7 @@ namespace TDengineExample
CheckStmtRes(res, "faild to execute");
// 6. free
TaosBind.FreeTaosBind(tags);
TaosMultiBind.FreeTaosBind(tags);
TaosMultiBind.FreeTaosBind(values);
TDengine.Close(conn);
TDengine.Cleanup();
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TDengineTMQ;
using TDengineDriver;
using System.Runtime.InteropServices;
namespace csharp
namespace TMQExample
{
internal class SubscribeDemo
{
static void Main(string[] args)
{
IntPtr conn = GetConnection();
string topic = "topic_example";
Console.WriteLine($"create topic if not exist {topic} as select * from meters");
//create topic
IntPtr res = TDengine.Query(conn, $"create topic if not exists {topic} as select * from meters");
if (res == IntPtr.Zero)
{
throw new Exception($"create topic failed, reason:{TDengine.Error(res)}");
}
var cfg = new ConsumerConfig
{
GourpId = "group_1",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
MsgWithTableName = "true",
TDConnectIp = "127.0.0.1",
};
// create consumer
var consumer = new ConsumerBuilder(cfg)
.Build();
// subscribe
consumer.Subscribe(topic);
// consume
for (int i = 0; i < 5; i++)
{
var consumeRes = consumer.Consume(300);
// print consumeResult
foreach (KeyValuePair<TopicPartition, TaosResult> kv in consumeRes.Message)
{
Console.WriteLine("topic partitions:\n{0}", kv.Key.ToString());
kv.Value.Metas.ForEach(meta =>
{
Console.Write("{0} {1}({2}) \t|", meta.name, meta.TypeName(), meta.size);
});
Console.WriteLine("");
kv.Value.Datas.ForEach(data =>
{
Console.WriteLine(data.ToString());
});
}
consumer.Commit(consumeRes);
Console.WriteLine("\n================ {0} done ", i);
}
// retrieve topic list
List<string> topics = consumer.Subscription();
topics.ForEach(t => Console.WriteLine("topic name:{0}", t));
// unsubscribe
consumer.Unsubscribe();
// close consumer after use.Otherwise will lead memory leak.
consumer.Close();
TDengine.Close(conn);
}
static IntPtr GetConnection()
{
string host = "localhost";
short port = 6030;
string username = "root";
string password = "taosdata";
string dbname = "power";
var conn = TDengine.Connect(host, username, password, dbname, port);
if (conn == IntPtr.Zero)
{
Console.WriteLine("Connect to TDengine failed");
System.Environment.Exit(0);
}
else
{
Console.WriteLine("Connect to TDengine success");
}
return conn;
}
}
}
......@@ -9,7 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" />
<PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup>
</Project>
......@@ -9,7 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" />
<PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup>
</Project>
......@@ -9,7 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" />
<PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup>
</Project>
......@@ -9,7 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" />
<PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup>
</Project>
......@@ -9,7 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" />
<PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup>
</Project>
......@@ -9,7 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" />
<PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup>
</Project>
......@@ -9,7 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" />
<PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup>
</Project>
......@@ -9,7 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" />
<PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup>
</Project>
......@@ -5,11 +5,11 @@
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<StartupObject>TDengineExample.SubscribeDemo</StartupObject>
<StartupObject>TMQExample.SubscribeDemo</StartupObject>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" />
<PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup>
</Project>
......@@ -124,52 +124,49 @@ gcc -g -O0 -fPIC -shared add_one.c -o add_one.so
用户可以通过 SQL 指令在系统中加载客户端所在主机上的 UDF 函数库(不能通过 RESTful 接口或 HTTP 管理界面来进行这一过程)。一旦创建成功,则当前 TDengine 集群的所有用户都可以在 SQL 指令中使用这些函数。UDF 存储在系统的 MNode 节点上,因此即使重启 TDengine 系统,已经创建的 UDF 也仍然可用。
在创建 UDF 时,需要区分标量函数和聚合函数。如果创建时声明了错误的函数类别,则可能导致通过 SQL 指令调用函数时出错。此外, UDF 支持输入与输出类型不一致,用户需要保证输入数据类型与 UDF 程序匹配,UDF 输出数据类型与 OUTPUTTYPE 匹配。
在创建 UDF 时,需要区分标量函数和聚合函数。如果创建时声明了错误的函数类别,则可能导致通过 SQL 指令调用函数时出错。此外,用户需要保证输入数据类型与 UDF 程序匹配,UDF 输出数据类型与 OUTPUTTYPE 匹配。
- 创建标量函数
```sql
CREATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) [ BUFSIZE B ];
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type;
```
- ids(X):标量函数未来在 SQL 指令中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
- ids(Y):包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- typename(Z):此函数计算结果的数据类型,与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
- B:中间计算结果的缓冲区大小,单位是字节,最小 0,最大 512,如果不使用可以不设置。
- function_name:标量函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udf 的实际名称一致;
- library_path:包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- output_type:此函数计算结果的数据类型名称;
例如,如下语句可以把 add_one.so 创建为系统中可用的 UDF:
例如,如下语句可以把 libbitand.so 创建为系统中可用的 UDF:
```sql
CREATE FUNCTION add_one AS "/home/taos/udf_example/add_one.so" OUTPUTTYPE INT;
CREATE FUNCTION bit_and AS "/home/taos/udf_example/libbitand.so" OUTPUTTYPE INT;
```
- 创建聚合函数:
```sql
CREATE AGGREGATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) [ BUFSIZE B ];
CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [ BUFSIZE buffer_size ];
```
- ids(X):聚合函数未来在 SQL 指令中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
- ids(Y):包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- typename(Z):此函数计算结果的数据类型,与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
- B:中间计算结果的缓冲区大小,单位是字节,最小 0,最大 512,如果不使用可以不设置。
- function_name:聚合函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
- library_path:包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- output_type:此函数计算结果的数据类型,与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
- buffer_size:中间计算结果的缓冲区大小,单位是字节。如果不使用可以不设置。
关于中间计算结果的使用,可以参考示例程序[demo.c](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/demo.c)
例如,如下语句可以把 demo.so 创建为系统中可用的 UDF:
例如,如下语句可以把 libsqrsum.so 创建为系统中可用的 UDF:
```sql
CREATE AGGREGATE FUNCTION demo AS "/home/taos/udf_example/demo.so" OUTPUTTYPE DOUBLE bufsize 14;
CREATE AGGREGATE FUNCTION sqr_sum AS "/home/taos/udf_example/libsqrsum.so" OUTPUTTYPE DOUBLE bufsize 8;
```
### 管理 UDF
- 删除指定名称的用户定义函数:
```
DROP FUNCTION ids(X);
DROP FUNCTION function_name;
```
- ids(X):此参数的含义与 CREATE 指令中的 ids(X) 参数一致,也即要删除的函数的名字,例如
- function_name:此参数的含义与 CREATE 指令中的 function_name 参数一致,也即要删除的函数的名字,例如
```sql
DROP FUNCTION add_one;
DROP FUNCTION bit_and;
```
- 显示系统中当前可用的所有 UDF:
```sql
......@@ -180,53 +177,32 @@ SHOW FUNCTIONS;
在 SQL 指令中,可以直接以在系统中创建 UDF 时赋予的函数名来调用用户定义函数。例如:
```sql
SELECT X(c) FROM table/stable;
SELECT X(c1,c2) FROM table/stable;
```
表示对名为 c 的数据列调用名为 X 的用户定义函数。SQL 指令中用户定义函数可以配合 WHERE 等查询特性来使用。
## UDF 的一些使用限制
在当前版本下,使用 UDF 存在如下这些限制:
表示对名为 c1, c2 的数据列调用名为 X 的用户定义函数。SQL 指令中用户定义函数可以配合 WHERE 等查询特性来使用。
1. 在创建和调用 UDF 时,服务端和客户端都只支持 Linux 操作系统;
2. UDF 不能与系统内建的 SQL 函数混合使用,暂不支持在一条 SQL 语句中使用多个不同名的 UDF ;
3. UDF 只支持以单个数据列作为输入;
4. UDF 只要创建成功,就会被持久化存储到 MNode 节点中;
5. 无法通过 RESTful 接口来创建 UDF;
6. UDF 在 SQL 中定义的函数名,必须与 .so 库文件实现中的接口函数名前缀保持一致,也即必须是 udfNormalFunc 的名称,而且不可与 TDengine 中已有的内建 SQL 函数重名。
## 示例代码
### 标量函数示例 [add_one](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/add_one.c)
<details>
<summary>add_one.c</summary>
```c
{{#include tests/script/sh/add_one.c}}
```
</details>
### 向量函数示例 [abs_max](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/abs_max.c)
### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/bit_and.c)
<details>
<summary>abs_max.c</summary>
<summary>bit_and.c</summary>
```c
{{#include tests/script/sh/abs_max.c}}
{{#include tests/script/sh/bit_and.c}}
```
</details>
### 使用中间计算结果示例 [demo](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/demo.c)
### 聚合函数示例 [sqr_sum](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/sqr_sum.c)
<details>
<summary>demo.c</summary>
<summary>sqr_sum.c</summary>
```c
{{#include tests/script/sh/demo.c}}
{{#include tests/script/sh/sqr_sum.c}}
```
</details>
......@@ -8,21 +8,30 @@ title: 用户自定义函数
## 创建函数
```sql
CREATE [AGGREGATE] FUNCTION func_name AS library_path OUTPUTTYPE type_name [BUFSIZE value]
CREATE [AGGREGATE] FUNCTION func_name AS library_path OUTPUTTYPE type_name [BUFSIZE buffer_size]
```
语法说明:
AGGREGATE:标识此函数是标量函数还是聚集函数。
func_name:函数名,必须与函数实现中udfNormalFunc的实际名称一致。
func_name:函数名,必须与函数实现中 udf 的实际名称一致。
library_path:包含UDF函数实现的动态链接库的绝对路径,是在客户端侧主机上的绝对路径。
OUTPUTTYPE:标识此函数的返回类型。
BUFSIZE:中间结果的缓冲区大小,单位是字节。不设置则默认为0。最大不可超过512字节
type_name:标识此函数的返回类型。
buffer_size:中间结果的缓冲区大小,单位是字节。不设置则默认为0
关于如何开发自定义函数,请参考 [UDF使用说明](../../develop/udf)
## 删除自定义函数
```
DROP FUNCTION function_name;
```
- function_name:此参数的含义与 CREATE 指令中的 function_name 参数一致,也即要删除的函数的名字,例如
## 显示 UDF
```sql
DROP FUNCTION func_name
```
\ No newline at end of file
SHOW FUNCTION;
```
......@@ -22,7 +22,9 @@ import CSAsyncQuery from "../../07-develop/04-query-data/_cs_async.mdx"
本文介绍如何在 Linux 或 Windows 环境中安装 `TDengine.Connector`,并通过 `TDengine.Connector` 连接 TDengine 集群,进行数据写入、查询等基本操作。
`TDengine.Connector` 的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-dotnet)。
注意:`TDengine.Connector` 3.x 不兼容 TDengine 2.x,如果在运行 TDengine 2.x 版本的环境下需要使用 C# 连接器请使用 TDengine.Connector 的 1.x 版本 。
`TDengine.Connector` 的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-dotnet/tree/3.0)。
## 支持的平台
......@@ -63,15 +65,15 @@ dotnet add package TDengine.Connector
</TabItem>
<TabItem value="source" label="使用源码获取 C# 驱动">
可以下载 TDengine 的源码,直接引用最新版本的 TDengine.Connector 库
也可以[下载源码](https://github.com/taosdata/taos-connector-dotnet/tree/3.0),直接引用 TDengine.Connector 库
```bash
git clone https://github.com/taosdata/TDengine.git
cd TDengine/src/connector/C#/src/
cp -r TDengineDriver/ myProject
git clone -b 3.0 https://github.com/taosdata/taos-connector-dotnet.git
cd taos-connector-dotnet
cp -r src/ myProject
cd myProject
dotnet add TDengineDriver/TDengineDriver.csproj
dotnet add exmaple.csproj reference src/TDengine.csproj
```
</TabItem>
</Tabs>
......@@ -145,20 +147,19 @@ namespace TDengineExample
|示例程序 | 示例程序描述 |
|--------------------------------------------------------------------------------------------------------------------|--------------------------------------------|
| [C#checker](https://github.com/taosdata/TDengine/tree/develop/examples/C%23/C%23checker) | 使用 TDengine.Connector 可以通过 help 命令中提供的参数,测试C# Driver的同步写入和查询 |
| [TDengineTest](https://github.com/taosdata/TDengine/tree/develop/examples/C%23/TDengineTest) | 使用 TDengine.Connector 实现的简单写入和查询的示例 |
| [insertCn](https://github.com/taosdata/TDengine/tree/develop/examples/C%23/insertCn) | 使用 TDengine.Connector 实现的写入和查询中文字符的示例 |
| [jsonTag](https://github.com/taosdata/TDengine/tree/develop/examples/C%23/jsonTag) | 使用 TDengine.Connector 实现的写入和查询 json tag 类型数据的示例 |
| [stmt](https://github.com/taosdata/TDengine/tree/develop/examples/C%23/stmt) | 使用 TDengine.Connector 实现的参数绑定的示例 |
| [schemaless](https://github.com/taosdata/TDengine/tree/develop/examples/C%23/schemaless) | 使用 TDengine.Connector 实现的使用 schemaless 写入的示例 |
| [benchmark](https://github.com/taosdata/TDengine/tree/develop/examples/C%23/taosdemo) | 使用 TDengine.Connector 实现的简易 Benchmark |
| [async query](https://github.com/taosdata/taos-connector-dotnet/blob/develop/examples/QueryAsyncSample.cs) | 使用 TDengine.Connector 实现的异步查询的示例 |
| [subscribe](https://github.com/taosdata/taos-connector-dotnet/blob/develop/examples/SubscribeSample.cs) | 使用 TDengine.Connector 实现的订阅数据的示例 |
| [CURD](https://github.com/taosdata/taos-connector-dotnet/blob/3.0/examples/Query/Query.cs) | 使用 TDengine.Connector 实现的建表、插入、查询示例 |
| [JSON Tag](https://github.com/taosdata/taos-connector-dotnet/blob/3.0/examples/JSONTag) | 使用 TDengine.Connector 实现的写入和查询 JSON tag 类型数据的示例 |
| [stmt](https://github.com/taosdata/taos-connector-dotnet/tree/3.0/examples/Stmt) | 使用 TDengine.Connector 实现的参数绑定插入和查询的示例 |
| [schemaless](https://github.com/taosdata/taos-connector-dotnet/blob/3.0/examples/schemaless) | 使用 TDengine.Connector 实现的使用 schemaless 写入的示例 |
| [async query](https://github.com/taosdata/taos-connector-dotnet/blob/3.0/examples/AsyncQuery/QueryAsync.cs) | 使用 TDengine.Connector 实现的异步查询的示例 |
| [TMQ](https://github.com/taosdata/taos-connector-dotnet/blob/3.0/examples/TMQ/TMQ.cs) | 使用 TDengine.Connector 实现的订阅数据的示例 |
## 重要更新记录
| TDengine.Connector | 说明 |
|--------------------|--------------------------------|
| 3.0.0 | 支持 TDengine 3.0.0.0,不兼容 2.x。新增接口TDengine.Impl.GetData(),解析查询结果。 |
| 1.0.7 | 修复 TDengine.Query()内存泄露。 |
| 1.0.6 | 修复 schemaless 在 1.0.4 和 1.0.5 中失效 bug。 |
| 1.0.5 | 修复 Windows 同步查询中文报错 bug。 |
| 1.0.4 | 新增异步查询,订阅等功能。修复绑定参数 bug。 |
......
......@@ -227,40 +227,12 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\)
#### 数据库相关配置参数
创建数据库时的相关参数在 json 配置文件中的 `dbinfo` 中配置,具体参数如下。这些参数与 TDengine 中 `create database` 时所指定的数据库参数相对应。
创建数据库时的相关参数在 json 配置文件中的 `dbinfo` 中配置,个别具体参数如下。其余参数均与 TDengine 中 `create database` 时所指定的数据库参数相对应,详见[../../taos-sql/database]
- **name** : 数据库名。
- **drop** : 插入前是否删除数据库,默认为 true。
- **replica** : 创建数据库时指定的副本数。
- **days** : 单个数据文件中存储数据的时间跨度,默认值为 10。
- **cache** : 缓存块的大小,单位是 MB,默认值是 16。
- **blocks** : 每个 vnode 中缓存块的数量,默认为 6。
- **precision** : 数据库时间精度,默认值为 "ms"。
- **keep** : 保留数据的天数,默认值为 3650。
- **minRows** : 文件块中的最小记录数,默认值为 100。
- **maxRows** : 文件块中的最大记录数,默认值为 4096。
- **comp** : 文件压缩标志,默认值为 2。
- **walLevel** : WAL 级别,默认为 1。
- **cacheLast** : 是否允许将每个表的最后一条记录保留在内存中,默认值为 0,可选值为 0,1,2,3。
- **quorum** : 多副本模式下的写确认数量,默认值为 1。
- **fsync** : 当 wal 设置为 2 时,fsync 的间隔时间,单位为 ms,默认值为 3000。
- **update** : 是否支持数据更新,默认值为 0, 可选值为 0, 1, 2。
#### 超级表相关配置参数
创建超级表时的相关参数在 json 配置文件中的 `super_tables` 中配置,具体参数如下表。
......@@ -335,6 +307,8 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\)
- **values** : nchar/binary 列/标签的值域,将从值中随机选择。
- **sma**: 将该列加入bsma中,值为 "yes" 或者 "no",默认为 "no"。
#### 插入行为配置参数
- **thread_count** : 插入数据的线程数量,默认为 8。
......
---
title: 诊断及其他
---
## 网络连接诊断
当出现客户端应用无法访问服务端时,需要确认客户端与服务端之间网络的各端口连通情况,以便有针对性地排除故障。
目前网络连接诊断支持在:Linux 与 Linux,Linux 与 Windows 之间进行诊断测试。
诊断步骤:
1. 如拟诊断的端口范围与服务器 taosd 实例的端口范围相同,须先停掉 taosd 实例
2. 服务端命令行输入:`taos -n server -P <port> -l <pktlen>` 以服务端身份启动对端口 port 为基准端口的监听
3. 客户端命令行输入:`taos -n client -h <fqdn of server> -P <port> -l <pktlen>` 以客户端身份启动对指定的服务器、指定的端口发送测试包
-l <pktlen\>: 测试网络包的大小(单位:字节)。最小值是 11、最大值是 64000,默认值为 1000。
注:两端命令行中指定的测试包长度必须一致,否则测试显示失败。
服务端运行正常的话会输出以下信息:
```bash
# taos -n server -P 6000
12/21 14:50:13.522509 0x7f536f455200 UTL work as server, host:172.27.0.7 startPort:6000 endPort:6011 pkgLen:1000
12/21 14:50:13.522659 0x7f5352242700 UTL TCP server at port:6000 is listening
12/21 14:50:13.522727 0x7f5351240700 UTL TCP server at port:6001 is listening
...
...
...
12/21 14:50:13.523954 0x7f5342fed700 UTL TCP server at port:6011 is listening
12/21 14:50:13.523989 0x7f53437ee700 UTL UDP server at port:6010 is listening
12/21 14:50:13.524019 0x7f53427ec700 UTL UDP server at port:6011 is listening
12/21 14:50:22.192849 0x7f5352242700 UTL TCP: read:1000 bytes from 172.27.0.8 at 6000
12/21 14:50:22.192993 0x7f5352242700 UTL TCP: write:1000 bytes to 172.27.0.8 at 6000
12/21 14:50:22.237082 0x7f5351a41700 UTL UDP: recv:1000 bytes from 172.27.0.8 at 6000
12/21 14:50:22.237203 0x7f5351a41700 UTL UDP: send:1000 bytes to 172.27.0.8 at 6000
12/21 14:50:22.237450 0x7f5351240700 UTL TCP: read:1000 bytes from 172.27.0.8 at 6001
12/21 14:50:22.237576 0x7f5351240700 UTL TCP: write:1000 bytes to 172.27.0.8 at 6001
12/21 14:50:22.281038 0x7f5350a3f700 UTL UDP: recv:1000 bytes from 172.27.0.8 at 6001
12/21 14:50:22.281141 0x7f5350a3f700 UTL UDP: send:1000 bytes to 172.27.0.8 at 6001
...
...
...
12/21 14:50:22.677443 0x7f5342fed700 UTL TCP: read:1000 bytes from 172.27.0.8 at 6011
12/21 14:50:22.677576 0x7f5342fed700 UTL TCP: write:1000 bytes to 172.27.0.8 at 6011
12/21 14:50:22.721144 0x7f53427ec700 UTL UDP: recv:1000 bytes from 172.27.0.8 at 6011
12/21 14:50:22.721261 0x7f53427ec700 UTL UDP: send:1000 bytes to 172.27.0.8 at 6011
```
客户端运行正常会输出以下信息:
```bash
# taos -n client -h 172.27.0.7 -P 6000
12/21 14:50:22.192434 0x7fc95d859200 UTL work as client, host:172.27.0.7 startPort:6000 endPort:6011 pkgLen:1000
12/21 14:50:22.192472 0x7fc95d859200 UTL server ip:172.27.0.7 is resolved from host:172.27.0.7
12/21 14:50:22.236869 0x7fc95d859200 UTL successed to test TCP port:6000
12/21 14:50:22.237215 0x7fc95d859200 UTL successed to test UDP port:6000
...
...
...
12/21 14:50:22.676891 0x7fc95d859200 UTL successed to test TCP port:6010
12/21 14:50:22.677240 0x7fc95d859200 UTL successed to test UDP port:6010
12/21 14:50:22.720893 0x7fc95d859200 UTL successed to test TCP port:6011
12/21 14:50:22.721274 0x7fc95d859200 UTL successed to test UDP port:6011
```
仔细阅读打印出来的错误信息,可以帮助管理员找到原因,以解决问题。
## 启动状态及 RPC 诊断
`taos -n startup -h <fqdn of server>`
判断 taosd 服务端是否成功启动,是数据库管理员经常遇到的一种情形。特别当若干台服务器组成集群时,判断每个服务端实例是否成功启动就会是一个重要问题。除检索 taosd 服务端日志文件进行问题定位、分析外,还可以通过 `taos -n startup -h <fqdn of server>` 来诊断一个 taosd 进程的启动状态。
针对多台服务器组成的集群,当服务启动过程耗时较长时,可通过该命令行来诊断每台服务器的 taosd 实例的启动状态,以准确定位问题。
`taos -n rpc -h <fqdn of server>`
该命令用来诊断已经启动的 taosd 实例的端口是否可正常访问。如果 taosd 程序异常或者失去响应,可以通过 `taos -n rpc -h <fqdn of server>` 来发起一个与指定 fqdn 的 rpc 通信,看看 taosd 是否能收到,以此来判定是网络问题还是 taosd 程序异常问题。
## sync 及 arbitrator 诊断
```
taos -n sync -P 6040 -h <fqdn of server>
taos -n sync -P 6042 -h <fqdn of server>
```
用来诊断 sync 端口是否工作正常,判断服务端 sync 模块是否成功工作。另外,-P 6042 用来诊断 arbitrator 是否配置正常,判断指定服务器的 arbitrator 是否能正常工作。
## 网络速度诊断
`taos -n speed -h <fqdn of server> -P 6030 -N 10 -l 10000000 -S TCP`
从 2.2.0.0 版本开始,taos 工具新提供了一个网络速度诊断的模式,可以对一个正在运行中的 taosd 实例或者 `taos -n server` 方式模拟的一个服务端实例,以非压缩传输的方式进行网络测速。这个模式下可供调整的参数如下:
-n:设为“speed”时,表示对网络速度进行诊断。
-h:所要连接的服务端的 FQDN 或 ip 地址。如果不设置这一项,会使用本机 taos.cfg 文件中 FQDN 参数的设置作为默认值。
-P:所连接服务端的网络端口。默认值为 6030。
-N:诊断过程中使用的网络包总数。最小值是 1、最大值是 10000,默认值为 100。
-l:单个网络包的大小(单位:字节)。最小值是 1024、最大值是 1024 `*` 1024 `*` 1024,默认值为 1024。
-S:网络封包的类型。可以是 TCP 或 UDP,默认值为 TCP。
## FQDN 解析速度诊断
`taos -n fqdn -h <fqdn of server>`
从 2.2.0.0 版本开始,taos 工具新提供了一个 FQDN 解析速度的诊断模式,可以对一个目标 FQDN 地址尝试解析,并记录解析过程中所消耗的时间。这个模式下可供调整的参数如下:
-n:设为“fqdn”时,表示对 FQDN 解析进行诊断。
-h:所要解析的目标 FQDN 地址。如果不设置这一项,会使用本机 taos.cfg 文件中 FQDN 参数的设置作为默认值。
## 服务端日志
taosd 服务端日志文件标志位 debugflag 默认为 131,在 debug 时往往需要将其提升到 135 或 143 。
一旦设定为 135 或 143,日志文件增长很快,特别是写入、查询请求量较大时,增长速度惊人。如合并保存日志,很容易把日志内的关键信息(如配置信息、错误信息等)冲掉。为此,服务端将重要信息日志与其他日志分开存放:
- taosinfo 存放重要信息日志, 包括:INFO/ERROR/WARNING 级别的日志信息。不记录 DEBUG、TRACE 级别的日志。
- taosdlog 服务器端生成的日志,记录 taosinfo 中全部信息外,还根据设置的日志输出级别,记录 DEBUG(日志级别 135)、TRACE(日志级别是 143)。
## 客户端日志
每个独立运行的客户端(一个进程)生成一个独立的客户端日志,其命名方式采用 taoslog+<序号> 的方式命名。文件标志位 debugflag 默认为 131,在 debug 时往往需要将其提升到 135 或 143 。
- taoslog 客户端(driver)生成的日志,默认记录客户端 INFO/ERROR/WARNING 级别日志,还根据设置的日志输出级别,记录 DEBUG(日志级别 135)、TRACE(日志级别是 143)。
其中,日志文件最大长度由 numOfLogLines 来进行配置,一个 taosd 实例最多保留两个文件。
taosd 服务端日志采用异步落盘写入机制,优点是可以避免硬盘写入压力太大,对性能造成很大影响。缺点是,在极端情况下,存在少量日志行数丢失的可能。
---
title: 诊断及其他
---
## 网络连接诊断
当出现客户端应用无法访问服务端时,需要确认客户端与服务端之间网络的各端口连通情况,以便有针对性地排除故障。
目前网络连接诊断支持在:Linux 与 Linux,Linux 与 Windows 之间进行诊断测试。
诊断步骤:
1. 如拟诊断的端口范围与服务器 taosd 实例的端口范围相同,须先停掉 taosd 实例
2. 服务端命令行输入:`taos -n server -P <port> -l <pktlen>` 以服务端身份启动对端口 port 为基准端口的监听
3. 客户端命令行输入:`taos -n client -h <fqdn of server> -P <port> -l <pktlen>` 以客户端身份启动对指定的服务器、指定的端口发送测试包
-l <pktlen\>: 测试网络包的大小(单位:字节)。最小值是 11、最大值是 64000,默认值为 1000。
注:两端命令行中指定的测试包长度必须一致,否则测试显示失败。
服务端运行正常的话会输出以下信息:
```bash
# taos -n server -P 6030 -l 1000
network test server is initialized, port:6030
request is received, size:1000
request is received, size:1000
...
...
...
request is received, size:1000
request is received, size:1000
```
客户端运行正常会输出以下信息:
```bash
# taos -n client -h 172.27.0.7 -P 6000
taos -n client -h v3s2 -P 6030 -l 1000
network test client is initialized, the server is v3s2:6030
request is sent, size:1000
response is received, size:1000
request is sent, size:1000
response is received, size:1000
...
...
...
request is sent, size:1000
response is received, size:1000
request is sent, size:1000
response is received, size:1000
total succ: 100/100 cost: 16.23 ms speed: 5.87 MB/s
```
仔细阅读打印出来的错误信息,可以帮助管理员找到原因,以解决问题。
## 服务端日志
taosd 服务端日志文件标志位 debugflag 默认为 131,在 debug 时往往需要将其提升到 135 或 143 。
一旦设定为 135 或 143,日志文件增长很快,特别是写入、查询请求量较大时,增长速度惊人。请注意日志文件目录所在磁盘的空间大小。
## 客户端日志
每个独立运行的客户端(一个进程)生成一个独立的客户端日志,其命名方式采用 taoslog+<序号> 的方式命名。文件标志位 debugflag 默认为 131,在 debug 时往往需要将其提升到 135 或 143 。
- taoslog 客户端(driver)生成的日志,默认记录客户端 INFO/ERROR/WARNING 级别日志,还根据设置的日志输出级别,记录 DEBUG(日志级别 135)、TRACE(日志级别是 143)。
其中,日志文件最大长度由 numOfLogLines 来进行配置,一个 taosd 实例最多保留两个文件。
taosd 服务端日志采用异步落盘写入机制,优点是可以避免硬盘写入压力太大,对性能造成很大影响。缺点是,在极端情况下,存在少量日志行数丢失的可能。当问题分析需要的时候,可以考虑将 参数 asynclog 设置成 0,修改为同步落盘写入机制,保证日志不会丢失。
......@@ -66,6 +66,25 @@ enum {
TASK_OUTPUT_STATUS__BLOCKED,
};
enum {
TASK_TRIGGER_STATUS__INACTIVE = 1,
TASK_TRIGGER_STATUS__ACTIVE,
};
enum {
TASK_LEVEL__SOURCE = 1,
TASK_LEVEL__AGG,
TASK_LEVEL__SINK,
};
enum {
TASK_OUTPUT__FIXED_DISPATCH = 1,
TASK_OUTPUT__SHUFFLE_DISPATCH,
TASK_OUTPUT__TABLE,
TASK_OUTPUT__SMA,
TASK_OUTPUT__FETCH,
};
typedef struct {
int8_t type;
} SStreamQueueItem;
......@@ -202,29 +221,6 @@ typedef struct {
int8_t reserved;
} STaskSinkFetch;
enum {
TASK_EXEC__NONE = 1,
TASK_EXEC__PIPE,
};
enum {
TASK_DISPATCH__NONE = 1,
TASK_DISPATCH__FIXED,
TASK_DISPATCH__SHUFFLE,
};
enum {
TASK_SINK__NONE = 1,
TASK_SINK__TABLE,
TASK_SINK__SMA,
TASK_SINK__FETCH,
};
enum {
TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
TASK_TRIGGER_STATUS__ACTIVE,
};
typedef struct {
int32_t nodeId;
int32_t childId;
......@@ -237,11 +233,8 @@ typedef struct {
typedef struct SStreamTask {
int64_t streamId;
int32_t taskId;
int8_t isDataScan;
int8_t execType;
int8_t sinkType;
int8_t dispatchType;
int8_t isStreamDistributed;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
int8_t taskStatus;
......@@ -252,13 +245,12 @@ typedef struct SStreamTask {
int32_t nodeId;
SEpSet epSet;
// used for semi or single task,
// while final task should have processedVer for each child
// used for task source and sink,
// while task agg should have processedVer for each child
int64_t recoverSnapVer;
int64_t startVer;
int64_t checkpointVer;
int64_t processedVer;
// int32_t numOfVgroups;
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......@@ -266,19 +258,13 @@ typedef struct SStreamTask {
// exec
STaskExec exec;
// TODO: unify sink and dispatch
// local sink
union {
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
// remote dispatcher
// output
union {
STaskDispatcherFixedEp fixedEpDispatcher;
STaskDispatcherShuffle shuffleDispatcher;
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
int8_t inputStatus;
......@@ -292,9 +278,6 @@ typedef struct SStreamTask {
int64_t triggerParam;
void* timer;
// application storage
// void* ahandle;
// msg handle
SMsgCb* pMsgCb;
} SStreamTask;
......@@ -331,7 +314,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
}
if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
#if 0
......@@ -346,18 +329,15 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
}
static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
} else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
} else {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
taosWriteQitem(pTask->outputQueue->queue, pBlock);
}
return 0;
......
......@@ -89,7 +89,7 @@ bool tsSmlDataFormat =
// query
int32_t tsQueryPolicy = 1;
int32_t tsQuerySmaOptimize = 1;
int32_t tsQuerySmaOptimize = 0;
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
......
......@@ -98,13 +98,11 @@ END:
}
int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
pTask->dispatchType = TASK_DISPATCH__NONE;
// sink
if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA;
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
......@@ -113,8 +111,6 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask
}
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
pTask->sinkType = TASK_SINK__NONE;
bool isShuffle = false;
if (pStream->fixedSinkVgId == 0) {
......@@ -122,7 +118,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
ASSERT(pDb);
if (pDb->cfg.numOfVgroups > 1) {
isShuffle = true;
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
ASSERT(0);
......@@ -152,7 +148,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
}
}
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
......@@ -178,7 +174,6 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, co
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE);
return 0;
}
......@@ -249,26 +244,20 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// source
pTask->isDataScan = 0;
// exec
pTask->execType = TASK_EXEC__NONE;
// type
pTask->taskLevel = TASK_LEVEL__SINK;
// sink
if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA;
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper);
}
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
}
return 0;
}
......@@ -295,25 +284,19 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
#endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
// source
pTask->isDataScan = 0;
// exec
pTask->execType = TASK_EXEC__NONE;
pTask->taskLevel = TASK_LEVEL__SINK;
// sink
if (pStream->smaId != 0) {
pTask->sinkType = TASK_SINK__SMA;
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
return 0;
}
......@@ -338,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
if (totLevel == 2 || externalTargetDB || multiTarget) {
/*if (true) {*/
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
......@@ -376,8 +360,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
// source
pInnerTask->isDataScan = 0;
pInnerTask->taskLevel = TASK_LEVEL__AGG;
// trigger
pInnerTask->triggerParam = pStream->triggerParam;
......@@ -388,9 +371,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
// exec
pInnerTask->execType = TASK_EXEC__PIPE;
#if 0
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDbObj != NULL);
......@@ -452,19 +432,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
mndAddTaskToTaskSet(taskSourceLevel, pTask);
// source
pTask->isDataScan = 1;
pTask->taskLevel = TASK_LEVEL__SOURCE;
// add fixed vg dispatch
pTask->sinkType = TASK_SINK__NONE;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->fixedEpDispatcher.taskId = pInnerTask->taskId;
pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
// exec
pTask->execType = TASK_EXEC__PIPE;
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
......@@ -515,7 +492,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
mndAddTaskToTaskSet(taskOneLevel, pTask);
// source
pTask->isDataScan = 1;
pTask->taskLevel = TASK_LEVEL__SOURCE;
// trigger
pTask->triggerParam = pStream->triggerParam;
......@@ -527,8 +504,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
mndAddSinkToTask(pMnode, pStream, pTask);
}
// exec
pTask->execType = TASK_EXEC__PIPE;
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
......
......@@ -795,11 +795,12 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
pStb = mndAcquireStb(pMnode, pSma->stb);
if (pStb == NULL) goto _OVER;
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
mndTransSetSerial(pTrans);
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
mndGetStreamNameFromSmaName(streamName, pSma->name);
......@@ -834,9 +835,6 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
code = 0;
_OVER:
if(code != 0) {
ASSERT(0);
}
mndTransDrop(pTrans);
mndReleaseVgroup(pMnode, pVgroup);
mndReleaseStb(pMnode, pStb);
......@@ -855,6 +853,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
if (pIter == NULL) break;
if (pSma->stbUid == pStb->uid) {
mndTransSetSerial(pTrans);
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
if (pVgroup == NULL) goto _OVER;
......@@ -935,7 +934,6 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
goto _OVER;
} else {
terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
ASSERT(0);
goto _OVER;
}
}
......
......@@ -323,8 +323,7 @@ FAIL:
}
int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
}
SEncoder encoder;
......@@ -548,7 +547,7 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
SStreamTask *pTask = taosArrayGetP(pTasks, 0);
if (!pTask->isDataScan && pTask->execType != TASK_EXEC__NONE) {
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(sz == 1);
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
return -1;
......@@ -564,8 +563,8 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (!pTask->isDataScan) break;
ASSERT(pTask->execType != TASK_EXEC__NONE);
if (pTask->taskLevel != TASK_LEVEL__SOURCE) break;
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) {
return -1;
}
......
......@@ -110,9 +110,6 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
pTask->pMsgCb = &pNode->msgCb;
ASSERT(pTask->execType != TASK_EXEC__NONE);
ASSERT(pTask->isDataScan == 0);
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
ASSERT(pTask->exec.executor);
......
......@@ -604,8 +604,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
int32_t code = 0;
ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1);
if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) {
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
}
......@@ -624,32 +624,30 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask->pMsgCb = &pTq->pVnode->msgCb;
// exec
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
if (pTask->isDataScan) {
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTqReader = 1,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
} else {
SReadHandle mgHandle = {
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
}
// expand executor
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTqReader = 1,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
SReadHandle mgHandle = {
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
ASSERT(pTask->exec.executor);
}
// sink
/*pTask->ahandle = pTq->pVnode;*/
if (pTask->sinkType == TASK_SINK__SMA) {
if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.vnode = pTq->pVnode;
pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->sinkType == TASK_SINK__TABLE) {
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqTableSink;
......@@ -715,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (!pTask->isDataScan) continue;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
......
......@@ -416,7 +416,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->isDataScan) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
ASSERT(code == 0);
}
......
......@@ -2234,7 +2234,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock);
int32_t numOfRows = 0;
//int32_t numOfRows = 0;
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
......@@ -2263,7 +2263,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
char* v = colDataGetData(pSrc, i);
colDataAppend(pDst, numOfRows, v, false);
//colDataAppend(pDst, numOfRows, v, false);
colDataAppend(pDst, pResBlock->info.rows, v, false);
}
pResBlock->info.rows += 1;
......@@ -2312,12 +2313,47 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
}
// add current row if timestamp match
if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
char* v = colDataGetData(pSrc, i);
colDataAppend(pDst, pResBlock->info.rows, v, false);
}
pResBlock->info.rows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
}
}
//check if need to interpolate after ts range
while (pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pBlock->info.rows - 1, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
}
// restore the value
......@@ -2375,6 +2411,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......
......@@ -2298,7 +2298,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "derivative",
.type = FUNCTION_TYPE_DERIVATIVE,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateDerivative,
.getEnvFunc = getDerivativeFuncEnv,
......
......@@ -31,7 +31,8 @@ extern "C" {
#define parserDebug(param, ...) qDebug("PARSER: " param, ##__VA_ARGS__)
#define parserTrace(param, ...) qTrace("PARSER: " param, ##__VA_ARGS__)
#define PK_TS_COL_INTERNAL_NAME "_rowts"
#define ROWTS_PSEUDO_COLUMN_NAME "_rowts"
#define C0_PSEUDO_COLUMN_NAME "_c0"
typedef struct SMsgBuf {
int32_t len;
......
......@@ -443,19 +443,23 @@ SNode* createNotBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft,
createOperatorNode(pCxt, OP_TYPE_GREATER_THAN, nodesCloneNode(pExpr), pRight));
}
static SNode* createPrimaryKeyCol(SAstCreateContext* pCxt) {
static SNode* createPrimaryKeyCol(SAstCreateContext* pCxt, const SToken* pFuncName) {
CHECK_PARSER_STATUS(pCxt);
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
CHECK_OUT_OF_MEM(pCol);
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME);
if (NULL == pFuncName) {
strcpy(pCol->colName, ROWTS_PSEUDO_COLUMN_NAME);
} else {
strncpy(pCol->colName, pFuncName->z, pFuncName->n);
}
return (SNode*)pCol;
}
SNode* createFunctionNode(SAstCreateContext* pCxt, const SToken* pFuncName, SNodeList* pParameterList) {
CHECK_PARSER_STATUS(pCxt);
if (0 == strncasecmp("_rowts", pFuncName->z, pFuncName->n) || 0 == strncasecmp("_c0", pFuncName->z, pFuncName->n)) {
return createPrimaryKeyCol(pCxt);
return createPrimaryKeyCol(pCxt, pFuncName);
}
SFunctionNode* func = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
CHECK_OUT_OF_MEM(func);
......@@ -586,7 +590,7 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) {
CHECK_PARSER_STATUS(pCxt);
SStateWindowNode* state = (SStateWindowNode*)nodesMakeNode(QUERY_NODE_STATE_WINDOW);
CHECK_OUT_OF_MEM(state);
state->pCol = createPrimaryKeyCol(pCxt);
state->pCol = createPrimaryKeyCol(pCxt, NULL);
if (NULL == state->pCol) {
nodesDestroyNode((SNode*)state);
CHECK_OUT_OF_MEM(state->pCol);
......@@ -600,7 +604,7 @@ SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode
CHECK_PARSER_STATUS(pCxt);
SIntervalWindowNode* interval = (SIntervalWindowNode*)nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
CHECK_OUT_OF_MEM(interval);
interval->pCol = createPrimaryKeyCol(pCxt);
interval->pCol = createPrimaryKeyCol(pCxt, NULL);
if (NULL == interval->pCol) {
nodesDestroyNode((SNode*)interval);
CHECK_OUT_OF_MEM(interval->pCol);
......@@ -639,7 +643,7 @@ SNode* createGroupingSetNode(SAstCreateContext* pCxt, SNode* pNode) {
SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd) {
CHECK_PARSER_STATUS(pCxt);
return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt), pStart, pEnd);
return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt, NULL), pStart, pEnd);
}
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias) {
......@@ -752,7 +756,7 @@ SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) {
SFillNode* pFillClause = (SFillNode*)pFill;
nodesDestroyNode(pFillClause->pWStartTs);
pFillClause->pWStartTs = createPrimaryKeyCol(pCxt);
pFillClause->pWStartTs = createPrimaryKeyCol(pCxt, NULL);
((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause;
}
return pStmt;
......@@ -1731,7 +1735,7 @@ SNode* createCountFuncForDelete(SAstCreateContext* pCxt) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
CHECK_OUT_OF_MEM(pFunc);
strcpy(pFunc->functionName, "count");
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createPrimaryKeyCol(pCxt))) {
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createPrimaryKeyCol(pCxt, NULL))) {
nodesDestroyNode((SNode*)pFunc);
CHECK_OUT_OF_MEM(NULL);
}
......
......@@ -612,7 +612,8 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p
}
static bool isInternalPrimaryKey(const SColumnNode* pCol) {
return PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME);
return PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId &&
(0 == strcmp(pCol->colName, ROWTS_PSEUDO_COLUMN_NAME) || 0 == strcmp(pCol->colName, C0_PSEUDO_COLUMN_NAME));
}
static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef, const STableNode* pTable,
......@@ -2566,7 +2567,7 @@ static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME);
strcpy(pCol->colName, ROWTS_PSEUDO_COLUMN_NAME);
pFill->pWStartTs = (SNode*)pCol;
*pOutput = (SNode*)pFill;
......@@ -2652,7 +2653,7 @@ static int32_t createPrimaryKeyColByTable(STranslateContext* pCxt, STableNode* p
return TSDB_CODE_OUT_OF_MEMORY;
}
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME);
strcpy(pCol->colName, ROWTS_PSEUDO_COLUMN_NAME);
bool found = false;
int32_t code = findAndSetColumn(pCxt, &pCol, pTable, &found);
if (TSDB_CODE_SUCCESS != code || !found) {
......@@ -3878,7 +3879,7 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch
return TSDB_CODE_OUT_OF_MEMORY;
}
((SColumnNode*)pInterval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(((SColumnNode*)pInterval->pCol)->colName, PK_TS_COL_INTERNAL_NAME);
strcpy(((SColumnNode*)pInterval->pCol)->colName, ROWTS_PSEUDO_COLUMN_NAME);
pCxt->createStream = true;
int32_t code = translateQuery(pCxt, (SNode*)pSelect);
......
......@@ -436,8 +436,8 @@ static int32_t pushDownCondOptDealScan(SOptimizeContext* pCxt, SScanLogicNode* p
SNode* pPrimaryKeyCond = NULL;
SNode* pOtherCond = NULL;
int32_t code = filterPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond, &pScan->pTagCond,
&pOtherCond);
int32_t code = filterPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond,
&pScan->pTagCond, &pOtherCond);
if (TSDB_CODE_SUCCESS == code && NULL != pScan->pTagCond) {
code = pushDownCondOptRebuildTbanme(&pScan->pTagCond);
}
......@@ -1711,7 +1711,7 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild,
if (!cxt.canUse) return false;
}
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && NULL != ((SJoinLogicNode*)pChild)->pOnConditions) {
SJoinLogicNode* pJoinLogicNode = (SJoinLogicNode*)pChild;
SJoinLogicNode* pJoinLogicNode = (SJoinLogicNode*)pChild;
CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = false};
nodesWalkExpr(pJoinLogicNode->pOnConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
if (!cxt.canUse) return false;
......@@ -1768,7 +1768,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pProjectNode->node.pChildren);
nodesDestroyNode((SNode*)pProjectNode);
//if pChild is a project logic node, remove its projection which is not reference by its target.
// if pChild is a project logic node, remove its projection which is not reference by its target.
alignProjectionWithTarget(pChild);
}
pCxt->optimized = true;
......@@ -2404,6 +2404,9 @@ static const SOptimizeRule optimizeRuleSet[] = {
static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule));
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
if (0 == (qDebugFlag & DEBUG_DEBUG)) {
return;
}
char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) {
......
......@@ -264,7 +264,7 @@ static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) {
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
return streamQuery ? false : stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_JOIN:
return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_PARTITION:
......@@ -1423,6 +1423,9 @@ static const SSplitRule splitRuleSet[] = {
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
if (0 == (qDebugFlag & DEBUG_DEBUG)) {
return;
}
char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) {
......
......@@ -19,6 +19,9 @@
#include "scalar.h"
static void dumpQueryPlan(SQueryPlan* pPlan) {
if (0 == (qDebugFlag & DEBUG_DEBUG)) {
return;
}
char* pStr = NULL;
nodesNodeToString((SNode*)pPlan, false, &pStr, NULL);
planDebugL("QID:0x%" PRIx64 " Query Plan: %s", pPlan->queryId, pStr);
......@@ -42,6 +45,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
if (TSDB_CODE_SUCCESS == code) {
code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
}
if (TSDB_CODE_SUCCESS == code) {
dumpQueryPlan(*pPlan);
}
nodesDestroyNode((SNode*)pLogicSubplan);
nodesDestroyNode((SNode*)pLogicPlan);
......@@ -79,6 +85,7 @@ static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDown
}
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.groupId, groupId);
return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
}
......
......@@ -3246,6 +3246,10 @@ _return:
}
bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) {
if (info->scalarMode) {
return true;
}
if (FILTER_EMPTY_RES(info)) {
return false;
}
......
......@@ -65,7 +65,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
}
trigger->pBlock->info.type = STREAM_GET_ALL;
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
streamTaskInput(pTask, (SStreamQueueItem*)trigger);
streamSchedExec(pTask);
......@@ -77,7 +77,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
int32_t streamSetupTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) {
pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
}
return 0;
}
......@@ -186,7 +186,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
if (exec) {
streamTryExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
}
} else {
......@@ -201,7 +201,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
qDebug("task %d receive dispatch rsp", pTask->taskId);
if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp);
if (leftRsp > 0) return 0;
......@@ -222,7 +222,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
int32_t streamProcessRunReq(SStreamTask* pTask) {
streamTryExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
}
return 0;
......@@ -250,7 +250,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
streamProcessRunReq(pTask);
if (pTask->isDataScan) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
// scan data to recover
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
pTask->taskStatus = TASK_STATUS__RECOVERING;
......@@ -272,12 +272,11 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->execType != TASK_EXEC__NONE);
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
streamSchedExec(pTask);
/*streamTryExec(pTask);*/
/*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/
/*streamDispatch(pTask);*/
return 0;
......
......@@ -242,7 +242,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
int32_t blockNum = taosArrayGetSize(pData->blocks);
ASSERT(blockNum != 0);
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.dataSrcVgId = pData->srcVgId,
......@@ -282,7 +282,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
taosArrayDestroy(req.dataLen);
return code;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0);
......@@ -393,11 +393,11 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data,
int32_t vgId = 0;
int32_t downstreamTaskId = 0;
// find ep
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
vgId = pTask->fixedEpDispatcher.nodeId;
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
downstreamTaskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
// TODO get ctbName for each block
SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
......@@ -439,8 +439,7 @@ FAIL:
}
int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
ASSERT(pTask->sinkType == TASK_SINK__NONE);
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
......
......@@ -24,7 +24,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
SStreamTrigger* pTrigger = (SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT);
......@@ -92,7 +92,7 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock
}
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
ASSERT(pTask->execType != TASK_EXEC__NONE);
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
void* exec = pTask->exec.executor;
......@@ -139,8 +139,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
return -1;
}
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
}
}
......@@ -161,7 +160,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (data == NULL) {
data = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->execType == TASK_EXEC__NONE) {
if (pTask->taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
......@@ -187,7 +186,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break;
}
if (pTask->execType == TASK_EXEC__NONE) {
if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, data);
continue;
......
......@@ -52,15 +52,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
return pMeta;
_err:
return NULL;
}
void streamMetaClose(SStreamMeta* pMeta) {
//
return;
tdbCommit(pMeta->db, &pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pStateDb);
tdbClose(pMeta->db);
}
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
......@@ -123,13 +124,32 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) {
if (tdbCommit(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
memset(&pMeta->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamMetaRollBack(SStreamMeta* pMeta) {
// TODO tdb rollback
int32_t streamMetaAbort(SStreamMeta* pMeta) {
if (tdbAbort(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
memset(&pMeta->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamRestoreTask(SStreamMeta* pMeta) {
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
......@@ -153,6 +173,18 @@ int32_t streamRestoreTask(SStreamMeta* pMeta) {
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
return -1;
}
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
return -1;
}
}
if (tdbTbcClose(pCur) < 0) {
return -1;
}
return 0;
......
......@@ -88,14 +88,15 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
}
int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
#if 0
if (pTask->taskStatus != TASK_STATUS__FAIL) {
return 0;
}
if (pTask->isStreamDistributed) {
if (pTask->isDataScan) {
if (pTask->taskType == TASK_TYPE__SOURCE) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
} else if (pTask->execType != TASK_EXEC__NONE) {
} else if (pTask->taskType != TASK_TYPE__SINK) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
bool hasCheckpoint = false;
int32_t childSz = taosArrayGetSize(pTask->childEpInfo);
......@@ -113,7 +114,7 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq*
}
}
} else {
if (pTask->isDataScan) {
if (pTask->taskType == TASK_TYPE__SOURCE) {
if (pTask->checkpointVer != -1) {
// load from checkpoint
} else {
......@@ -133,5 +134,6 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq*
}
}
#endif
return 0;
}
......@@ -52,10 +52,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
......@@ -73,27 +71,23 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
if (pTask->execType != TASK_EXEC__NONE) {
if (pTask->taskLevel != TASK_LEVEL__SINK) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType == TASK_SINK__TABLE) {
if (pTask->outputType == TASK_OUTPUT__TABLE) {
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
} else if (pTask->outputType == TASK_OUTPUT__FETCH) {
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
}
......@@ -107,10 +101,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
......@@ -131,29 +123,25 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
taosArrayPush(pTask->childEpInfo, &pInfo);
}
if (pTask->execType != TASK_EXEC__NONE) {
if (pTask->taskLevel != TASK_LEVEL__SINK) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType == TASK_SINK__TABLE) {
if (pTask->outputType == TASK_OUTPUT__TABLE) {
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
} else if (pTask->outputType == TASK_OUTPUT__FETCH) {
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
}
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 8,
"thread_count_create_tbl": 8,
"create_table_thread_count": 8,
"result_file": "./tpl_insert_result_tpl",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 1,
"create_table_thread_count": 1,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"databases": [{
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -185,7 +185,7 @@ class TDTestCase:
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "/tmp/insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -168,7 +168,7 @@ class JoinPerf:
"user": self.user,
"password": self.password,
"thread_count": cpu_count(),
"thread_count_create_tbl": cpu_count(),
"create_table_thread_count": cpu_count(),
"result_file": "/tmp/insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -172,7 +172,7 @@ class Taosdemo:
"user": self.user,
"password": self.password,
"thread_count": cpu_count(),
"thread_count_create_tbl": cpu_count(),
"create_table_thread_count": cpu_count(),
"result_file": "/tmp/insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file":"./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -133,7 +133,7 @@ class TDTestCase:
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "/tmp/insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 5000,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -7,7 +7,7 @@
"password": "taosdata",
"thread_count": 2,
"num_of_records_per_req": 10,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"databases": [{
"dbinfo": {
"name": "db01",
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"create_table_thread_count": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"create_table_thread_count": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"create_table_thread_count": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"create_table_thread_count": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"create_table_thread_count": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"create_table_thread_count": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"create_table_thread_count": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"create_table_thread_count": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 10,
"thread_count_create_tbl": 10,
"create_table_thread_count": 10,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file":"./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 100,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file":"./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file":"./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
......@@ -6,7 +6,7 @@
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"create_table_thread_count": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册