Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bd84ecc3
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bd84ecc3
编写于
11月 28, 2022
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix:[TD-20613] add interface for write raw block with sdfasdf
上级
c557f3e6
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
210 addition
and
2 deletion
+210
-2
include/client/taos.h
include/client/taos.h
+1
-0
include/os/os.h
include/os/os.h
+2
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/client/src/clientRawBlockWrite.c
source/client/src/clientRawBlockWrite.c
+206
-0
未找到文件。
include/client/taos.h
浏览文件 @
bd84ecc3
...
...
@@ -297,6 +297,7 @@ DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT
int32_t
tmq_get_raw
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw
);
DLL_EXPORT
int32_t
tmq_write_raw
(
TAOS
*
taos
,
tmq_raw_data
raw
);
DLL_EXPORT
int
taos_write_raw_block
(
TAOS
*
taos
,
int
numOfRows
,
char
*
pData
,
const
char
*
tbname
);
DLL_EXPORT
int
taos_write_raw_block_with_fields
(
TAOS
*
taos
,
int
rows
,
char
*
pData
,
const
char
*
tbname
,
TAOS_FIELD
*
fields
,
int
numFields
);
DLL_EXPORT
void
tmq_free_raw
(
tmq_raw_data
raw
);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
...
...
include/os/os.h
浏览文件 @
bd84ecc3
...
...
@@ -43,12 +43,13 @@ extern "C" {
#include <sys/utsname.h>
#include <sys/wait.h>
#include <termios.h>
#include <cpuid.h>
#if defined(DARWIN)
#else
#include <argp.h>
#include <sys/prctl.h>
#include <cpuid.h>
#endif
#else
...
...
source/client/src/clientImpl.c
浏览文件 @
bd84ecc3
...
...
@@ -1645,7 +1645,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
static
int32_t
estimateJsonLen
(
SReqResultInfo
*
pResultInfo
,
int32_t
numOfCols
,
int32_t
numOfRows
)
{
char
*
p
=
(
char
*
)
pResultInfo
->
pData
;
//
version + length + numOfRows + numOfCol + groupId + flag_segment + column_info
//
| version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
int32_t
len
=
getVersion1BlockMetaSize
(
p
,
numOfCols
);
int32_t
*
colLength
=
(
int32_t
*
)(
p
+
len
);
len
+=
sizeof
(
int32_t
)
*
numOfCols
;
...
...
source/client/src/clientRawBlockWrite.c
浏览文件 @
bd84ecc3
...
...
@@ -1211,6 +1211,211 @@ static void destroyVgHash(void* data) {
taosMemoryFreeClear
(
vgData
->
data
);
}
int
taos_write_raw_block_with_fields
(
TAOS
*
taos
,
int
rows
,
char
*
pData
,
const
char
*
tbname
,
TAOS_FIELD
*
fields
,
int
numFields
){
int32_t
code
=
TSDB_CODE_SUCCESS
;
STableMeta
*
pTableMeta
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SSubmitReq
*
subReq
=
NULL
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
,
0
);
if
(
!
pRequest
)
{
uError
(
"WriteRaw:createRequest error request is null"
);
code
=
terrno
;
goto
end
;
}
pRequest
->
syncQuery
=
true
;
if
(
!
pRequest
->
pDb
)
{
uError
(
"WriteRaw:not use db"
);
code
=
TSDB_CODE_PAR_DB_NOT_SPECIFIED
;
goto
end
;
}
SName
pName
=
{
TSDB_TABLE_NAME_T
,
pRequest
->
pTscObj
->
acctId
,
{
0
},
{
0
}};
tstrncpy
(
pName
.
dbname
,
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
));
tstrncpy
(
pName
.
tname
,
tbname
,
sizeof
(
pName
.
tname
));
struct
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw: get gatlog error"
);
goto
end
;
}
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
conn
.
requestId
=
pRequest
->
requestId
;
conn
.
requestObjRefId
=
pRequest
->
self
;
conn
.
mgmtEps
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
);
SVgroupInfo
vgData
=
{
0
};
code
=
catalogGetTableHashVgroup
(
pCatalog
,
&
conn
,
&
pName
,
&
vgData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw:catalogGetTableHashVgroup failed. table name: %s"
,
tbname
);
goto
end
;
}
code
=
catalogGetTableMeta
(
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw:catalogGetTableMeta failed. table name: %s"
,
tbname
);
goto
end
;
}
uint64_t
suid
=
(
TSDB_NORMAL_TABLE
==
pTableMeta
->
tableType
?
0
:
pTableMeta
->
suid
);
uint64_t
uid
=
pTableMeta
->
uid
;
int32_t
numOfCols
=
numFields
;
uint16_t
fLen
=
0
;
int32_t
rowSize
=
0
;
int16_t
nVar
=
0
;
for
(
int
k
=
0
;
k
<
numOfCols
;
k
++
){
for
(
int
i
=
0
;
i
<
pTableMeta
->
tableInfo
.
numOfColumns
;
i
++
)
{
SSchema
*
schema
=
pTableMeta
->
schema
+
i
;
if
(
strcmp
(
schema
->
name
,
fields
[
k
].
name
)
!=
0
)
continue
;
fLen
+=
TYPE_BYTES
[
schema
->
type
];
rowSize
+=
schema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
schema
->
type
))
{
nVar
++
;
}
}
}
fLen
-=
sizeof
(
TSKEY
);
int32_t
extendedRowSize
=
rowSize
+
TD_ROW_HEAD_LEN
-
sizeof
(
TSKEY
)
+
nVar
*
sizeof
(
VarDataOffsetT
)
+
(
int32_t
)
TD_BITMAP_BYTES
(
numOfCols
-
1
);
int32_t
schemaLen
=
0
;
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
int32_t
totalLen
=
sizeof
(
SSubmitReq
)
+
submitLen
;
subReq
=
taosMemoryCalloc
(
1
,
totalLen
);
SSubmitBlk
*
blk
=
POINTER_SHIFT
(
subReq
,
sizeof
(
SSubmitReq
));
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
SRowBuilder
rb
=
{
0
};
tdSRowInit
(
&
rb
,
pTableMeta
->
sversion
);
tdSRowSetTpInfo
(
&
rb
,
numOfCols
,
fLen
);
int32_t
dataLen
=
0
;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
char
*
pStart
=
pData
+
getVersion1BlockMetaSize
(
pData
,
numOfCols
);
int32_t
*
colLength
=
(
int32_t
*
)
pStart
;
pStart
+=
sizeof
(
int32_t
)
*
numOfCols
;
SResultColumn
*
pCol
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
SResultColumn
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
if
(
IS_VAR_DATA_TYPE
(
pTableMeta
->
schema
[
i
].
type
))
{
pCol
[
i
].
offset
=
(
int32_t
*
)
pStart
;
pStart
+=
rows
*
sizeof
(
int32_t
);
}
else
{
pCol
[
i
].
nullbitmap
=
pStart
;
pStart
+=
BitmapLen
(
rows
);
}
pCol
[
i
].
pData
=
pStart
;
pStart
+=
colLength
[
i
];
}
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
tdSRowResetBuf
(
&
rb
,
rowData
);
int32_t
offset
=
0
;
for
(
int32_t
k
=
0
;
k
<
numOfCols
;
k
++
)
{
const
SSchema
*
pColumn
=
NULL
;
for
(
int
i
=
0
;
i
<
pTableMeta
->
tableInfo
.
numOfColumns
;
i
++
)
{
if
(
strcmp
((
pTableMeta
->
schema
+
i
)
->
name
,
fields
[
k
].
name
)
==
0
)
{
pColumn
=
pTableMeta
->
schema
+
i
;
break
;
}
}
if
(
pColumn
==
NULL
){
uError
(
"column not exist:%s"
,
fields
[
k
].
name
);
code
=
TSDB_CODE_INVALID_PARA
;
goto
end
;
}
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
type
))
{
if
(
pCol
[
k
].
offset
[
j
]
!=
-
1
)
{
char
*
data
=
pCol
[
k
].
pData
+
pCol
[
k
].
offset
[
j
];
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
data
,
true
,
offset
,
k
);
}
else
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
}
else
{
if
(
!
colDataIsNull_f
(
pCol
[
k
].
nullbitmap
,
j
))
{
char
*
data
=
pCol
[
k
].
pData
+
pColumn
->
bytes
*
j
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
data
,
true
,
offset
,
k
);
}
else
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
}
if
(
pColumn
->
colId
!=
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
offset
+=
TYPE_BYTES
[
pColumn
->
type
];
}
}
tdSRowEnd
(
&
rb
);
int32_t
rowLen
=
TD_ROW_LEN
(
rowData
);
rowData
=
POINTER_SHIFT
(
rowData
,
rowLen
);
dataLen
+=
rowLen
;
}
taosMemoryFree
(
pCol
);
blk
->
uid
=
htobe64
(
uid
);
blk
->
suid
=
htobe64
(
suid
);
blk
->
sversion
=
htonl
(
pTableMeta
->
sversion
);
blk
->
schemaLen
=
htonl
(
schemaLen
);
blk
->
numOfRows
=
htonl
(
rows
);
blk
->
dataLen
=
htonl
(
dataLen
);
subReq
->
length
=
sizeof
(
SSubmitReq
)
+
sizeof
(
SSubmitBlk
)
+
schemaLen
+
dataLen
;
subReq
->
numOfBlocks
=
1
;
pQuery
=
(
SQuery
*
)
nodesMakeNode
(
QUERY_NODE_QUERY
);
if
(
NULL
==
pQuery
)
{
uError
(
"create SQuery error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
pQuery
->
haveResultSet
=
false
;
pQuery
->
msgType
=
TDMT_VND_SUBMIT
;
pQuery
->
pRoot
=
(
SNode
*
)
nodesMakeNode
(
QUERY_NODE_VNODE_MODIF_STMT
);
if
(
NULL
==
pQuery
->
pRoot
)
{
uError
(
"create pQuery->pRoot error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
SVnodeModifOpStmt
*
nodeStmt
=
(
SVnodeModifOpStmt
*
)(
pQuery
->
pRoot
);
nodeStmt
->
pDataBlocks
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SVgDataBlocks
*
dst
=
taosMemoryCalloc
(
1
,
sizeof
(
SVgDataBlocks
));
if
(
NULL
==
dst
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
end
;
}
dst
->
vg
=
vgData
;
dst
->
numOfTables
=
subReq
->
numOfBlocks
;
dst
->
size
=
subReq
->
length
;
dst
->
pData
=
(
char
*
)
subReq
;
subReq
->
header
.
vgId
=
htonl
(
dst
->
vg
.
vgId
);
subReq
->
version
=
htonl
(
1
);
subReq
->
header
.
contLen
=
htonl
(
subReq
->
length
);
subReq
->
length
=
htonl
(
subReq
->
length
);
subReq
->
numOfBlocks
=
htonl
(
subReq
->
numOfBlocks
);
subReq
=
NULL
;
// no need free
taosArrayPush
(
nodeStmt
->
pDataBlocks
,
&
dst
);
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
code
=
pRequest
->
code
;
end:
taosMemoryFreeClear
(
pTableMeta
);
qDestroyQuery
(
pQuery
);
taosMemoryFree
(
subReq
);
return
code
;
}
int
taos_write_raw_block
(
TAOS
*
taos
,
int
rows
,
char
*
pData
,
const
char
*
tbname
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
STableMeta
*
pTableMeta
=
NULL
;
...
...
@@ -1293,6 +1498,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
tdSRowSetTpInfo
(
&
rb
,
numOfCols
,
fLen
);
int32_t
dataLen
=
0
;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
char
*
pStart
=
pData
+
getVersion1BlockMetaSize
(
pData
,
numOfCols
);
int32_t
*
colLength
=
(
int32_t
*
)
pStart
;
pStart
+=
sizeof
(
int32_t
)
*
numOfCols
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录