Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
a7040bed
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a7040bed
编写于
3月 21, 2020
作者:
S
slguan
提交者:
GitHub
3月 21, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1398 from taosdata/liaohj_2
[td-32] fix bugs in inserting data
上级
b0385c7a
83ea175d
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
58 addition
and
59 deletion
+58
-59
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+2
-2
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+11
-16
src/client/src/tscSchemaUtil.c
src/client/src/tscSchemaUtil.c
+1
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+17
-16
src/client/src/tscSql.c
src/client/src/tscSql.c
+3
-1
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+0
-3
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+4
-3
src/dnode/src/dnodeWrite.c
src/dnode/src/dnodeWrite.c
+3
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+14
-13
src/mnode/src/mgmtChildTable.c
src/mnode/src/mgmtChildTable.c
+1
-1
src/mnode/src/mgmtNormalTable.c
src/mnode/src/mgmtNormalTable.c
+1
-1
src/mnode/src/mgmtSuperTable.c
src/mnode/src/mgmtSuperTable.c
+1
-1
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
a7040bed
...
...
@@ -62,7 +62,7 @@ typedef struct STableMeta {
int8_t
numOfVpeers
;
int16_t
sversion
;
SVnodeDesc
vpeerDesc
[
TSDB_VNODES_SUPPORT
];
int32_t
vg
i
d
;
// virtual group id, which current table belongs to
int32_t
vg
I
d
;
// virtual group id, which current table belongs to
int32_t
sid
;
// the index of one table in a virtual node
uint64_t
uid
;
// unique id of a table
SSchema
schema
[];
// if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
...
...
@@ -182,7 +182,7 @@ typedef struct STableDataBlocks {
char
tableId
[
TSDB_TABLE_ID_LEN
];
int8_t
tsSource
;
// where does the UNIX timestamp come from, server or client
bool
ordered
;
// if current rows are ordered or not
int64_t
vg
i
d
;
// virtual group id
int64_t
vg
I
d
;
// virtual group id
int64_t
prevTS
;
// previous timestamp, recorded to decide if the records array is ts ascending
int32_t
numOfTables
;
// number of tables in current submit block
...
...
src/client/src/tscParseInsert.c
浏览文件 @
a7040bed
...
...
@@ -698,7 +698,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
SShellSubmitBlock
*
pBlocks
=
(
SShellSubmitBlock
*
)(
dataBuf
->
pData
);
tsSetBlockInfo
(
pBlocks
,
pTableMeta
,
numOfRows
);
dataBuf
->
vg
id
=
pTableMeta
->
vgi
d
;
dataBuf
->
vg
Id
=
pTableMeta
->
vgI
d
;
dataBuf
->
numOfTables
=
1
;
/*
...
...
@@ -1058,7 +1058,6 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto
_error_clean
;
}
void
*
fp
=
pSql
->
fp
;
ptrdiff_t
pos
=
pSql
->
asyncTblPos
-
pSql
->
sqlstr
;
if
((
code
=
tscCheckIfCreateTable
(
&
str
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1068,17 +1067,15 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
* And during the getMeterMetaCallback function, the sql string will be parsed from the
* interrupted position.
*/
if
(
fp
!=
NULL
)
{
if
(
TSDB_CODE_ACTION_IN_PROGRESS
==
code
)
{
tscTrace
(
"async insert and waiting to get meter meta, then continue parse sql from offset: %"
PRId64
,
pos
);
return
code
;
}
// todo add to return
tscError
(
"async insert parse error, code:%d, %s"
,
code
,
tstrerror
(
code
));
pSql
->
asyncTblPos
=
NULL
;
if
(
TSDB_CODE_ACTION_IN_PROGRESS
==
code
)
{
tscTrace
(
"async insert and waiting to get meter meta, then continue parse sql from offset: %"
PRId64
,
pos
);
return
code
;
}
// todo add to return
tscError
(
"async insert parse error, code:%d, %s"
,
code
,
tstrerror
(
code
));
pSql
->
asyncTblPos
=
NULL
;
goto
_error_clean
;
// TODO: should _clean or _error_clean to async flow ????
}
...
...
@@ -1096,15 +1093,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto
_error_clean
;
}
int32_t
numOfCols
=
tscGetNumOfTags
(
pTableMetaInfo
->
pTableMeta
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
if
(
sToken
.
type
==
TK_VALUES
)
{
SParsedDataColInfo
spd
=
{.
numOfCols
=
numOfCol
s
};
SParsedDataColInfo
spd
=
{.
numOfCols
=
tinfo
.
numOfColumn
s
};
SSchema
*
pSchema
=
tscGetTableSchema
(
pTableMetaInfo
->
pTableMeta
);
tscSetAssignedColumnInfo
(
&
spd
,
pSchema
,
numOfCols
);
tscSetAssignedColumnInfo
(
&
spd
,
pSchema
,
tinfo
.
numOfColumns
);
if
(
validateDataSource
(
pCmd
,
DATA_FROM_SQL_STRING
,
sToken
.
z
)
!=
TSDB_CODE_SUCCESS
)
{
goto
_error_clean
;
...
...
@@ -1243,7 +1238,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
// submit to more than one vnode
if
(
pCmd
->
pDataBlocks
->
nSize
>
0
)
{
// merge according to vg
i
d
// merge according to vg
I
d
if
((
code
=
tscMergeTableDataBlocks
(
pSql
,
pCmd
->
pDataBlocks
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error_clean
;
}
...
...
src/client/src/tscSchemaUtil.c
浏览文件 @
a7040bed
...
...
@@ -165,7 +165,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta
->
sid
=
pTableMetaMsg
->
sid
;
pTableMeta
->
uid
=
pTableMetaMsg
->
uid
;
pTableMeta
->
vg
id
=
pTableMetaMsg
->
vgi
d
;
pTableMeta
->
vg
Id
=
pTableMetaMsg
->
vgI
d
;
pTableMeta
->
numOfVpeers
=
pTableMetaMsg
->
numOfVpeers
;
memcpy
(
pTableMeta
->
vpeerDesc
,
pTableMetaMsg
->
vpeerDesc
,
sizeof
(
SVnodeDesc
)
*
pTableMeta
->
numOfVpeers
);
...
...
src/client/src/tscServer.c
浏览文件 @
a7040bed
...
...
@@ -341,11 +341,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
* the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
*/
bool
shouldFree
=
tscShouldFreeAsyncSqlObj
(
pSql
);
if
(
command
==
TSDB_SQL_INSERT
)
{
// handle multi-vnode insertion situation
(
*
pSql
->
fp
)(
pSql
,
taosres
,
rpcMsg
->
code
);
}
else
{
(
*
pSql
->
fp
)(
pSql
->
param
,
taosres
,
rpcMsg
->
code
);
}
(
*
pSql
->
fp
)(
pSql
->
param
,
taosres
,
rpcMsg
->
code
);
if
(
shouldFree
)
{
// If it is failed, all objects allocated during execution taos_connect_a should be released
...
...
@@ -539,22 +535,27 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char
*
pMsg
,
*
pStart
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
STableMeta
*
pTableMeta
=
tscGetMetaInfo
(
pQueryInfo
,
0
)
->
pTableMeta
;
pStart
=
pSql
->
cmd
.
payload
+
tsRpcHeadSize
;
pMsg
=
pStart
;
pShellMsg
=
(
SShellSubmitMsg
*
)
pMsg
;
pShellMsg
->
desc
.
numOfVnodes
=
htonl
(
1
);
pShellMsg
->
import
=
htons
(
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_INSERT
)
?
0
:
1
);
pShellMsg
->
vnode
=
0
;
//htons(pTableMeta->vpeerDesc[pTableMeta->index].vnode);
pShellMsg
->
numOfSid
=
htonl
(
pSql
->
cmd
.
numOfTablesInSubmit
);
// number of meters to be inserted
pShellMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pShellMsg
->
header
.
contLen
=
htonl
(
pSql
->
cmd
.
payloadLen
);
pShellMsg
->
numOfTables
=
htonl
(
pSql
->
cmd
.
numOfTablesInSubmit
);
// number of meters to be inserted
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
pSql
->
cmd
.
msgType
=
TSDB_MSG_TYPE_SUBMIT
;
// tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip),
// htons(pShellMsg->vnode));
pSql
->
cmd
.
payloadLen
=
sizeof
(
SShellSubmitMsg
);
//
pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -676,7 +677,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
uid
=
pTableMeta
->
uid
;
pQueryMsg
->
numOfTagsCols
=
0
;
pQueryMsg
->
vgId
=
htonl
(
pTableMeta
->
vg
i
d
);
pQueryMsg
->
vgId
=
htonl
(
pTableMeta
->
vg
I
d
);
tscTrace
(
"%p queried tables:%d, table id: %s"
,
pSql
,
1
,
pTableMetaInfo
->
name
);
}
else
{
// query on super table
if
(
pTableMetaInfo
->
vnodeIndex
<
0
)
{
...
...
@@ -1849,12 +1850,12 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg
->
sid
=
htonl
(
pMetaMsg
->
sid
);
pMetaMsg
->
sversion
=
htons
(
pMetaMsg
->
sversion
);
pMetaMsg
->
vg
id
=
htonl
(
pMetaMsg
->
vgi
d
);
pMetaMsg
->
vg
Id
=
htonl
(
pMetaMsg
->
vgI
d
);
pMetaMsg
->
uid
=
htobe64
(
pMetaMsg
->
uid
);
pMetaMsg
->
contLen
=
htons
(
pMetaMsg
->
contLen
);
if
(
pMetaMsg
->
sid
<
0
||
pMetaMsg
->
vg
i
d
<
0
)
{
tscError
(
"invalid meter vg
id:%d, sid%d"
,
pMetaMsg
->
vgi
d
,
pMetaMsg
->
sid
);
if
(
pMetaMsg
->
sid
<
0
||
pMetaMsg
->
vg
I
d
<
0
)
{
tscError
(
"invalid meter vg
Id:%d, sid%d"
,
pMetaMsg
->
vgI
d
,
pMetaMsg
->
sid
);
return
TSDB_CODE_INVALID_VALUE
;
}
...
...
@@ -1948,11 +1949,11 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
pMeta
->
sid
=
htonl
(
pMeta
->
sid
);
pMeta
->
sversion
=
htons
(
pMeta
->
sversion
);
pMeta
->
vg
id
=
htonl
(
pMeta
->
vgi
d
);
pMeta
->
vg
Id
=
htonl
(
pMeta
->
vgI
d
);
pMeta
->
uid
=
htobe64
(
pMeta
->
uid
);
if
(
pMeta
->
sid
<=
0
||
pMeta
->
vg
i
d
<
0
)
{
tscError
(
"invalid meter vg
id:%d, sid%d"
,
pMeta
->
vgi
d
,
pMeta
->
sid
);
if
(
pMeta
->
sid
<=
0
||
pMeta
->
vg
I
d
<
0
)
{
tscError
(
"invalid meter vg
Id:%d, sid%d"
,
pMeta
->
vgI
d
,
pMeta
->
sid
);
pSql
->
res
.
code
=
TSDB_CODE_INVALID_VALUE
;
pSql
->
res
.
numOfTotal
=
i
;
return
TSDB_CODE_OTHERS
;
...
...
src/client/src/tscSql.c
浏览文件 @
a7040bed
...
...
@@ -130,7 +130,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pSql
->
pTscObj
=
pObj
;
pSql
->
signature
=
pSql
;
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
// tsem_init(&pSql->emptyRspSem, 0, 1);
pObj
->
pSql
=
pSql
;
pSql
->
fp
=
fp
;
pSql
->
param
=
param
;
...
...
@@ -146,6 +146,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return
NULL
;
}
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize
=
tsRpcHeadSize
+
sizeof
(
SShellSubmitMsg
);
return
pObj
;
}
...
...
src/client/src/tscSystem.c
浏览文件 @
a7040bed
...
...
@@ -34,7 +34,6 @@ void * pTscMgmtConn;
void
*
pSlaveConn
;
void
*
tscCacheHandle
;
int32_t
globalCode
=
0
;
int
initialized
=
0
;
int
slaveIndex
;
void
*
tscTmr
;
void
*
tscQhandle
;
...
...
@@ -187,9 +186,7 @@ void taos_init_imp() {
if
(
tscCacheHandle
==
NULL
)
tscCacheHandle
=
taosCacheInit
(
tscTmr
,
refreshTime
);
initialized
=
1
;
tscTrace
(
"client is initialized successfully"
);
tsInsertHeadSize
=
tsRpcHeadSize
+
sizeof
(
SShellSubmitMsg
);
}
void
taos_init
()
{
pthread_once
(
&
tscinit
,
taos_init_imp
);
}
...
...
src/client/src/tscUtil.c
浏览文件 @
a7040bed
...
...
@@ -614,7 +614,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
*/
pCmd
->
payloadLen
=
pDataBlock
->
nAllocSize
-
tsRpcHeadSize
;
assert
(
pCmd
->
allocSize
>=
pCmd
->
payloadLen
+
tsRpcHeadSize
+
100
);
assert
(
pCmd
->
allocSize
>=
pCmd
->
payloadLen
+
tsRpcHeadSize
+
100
&&
pCmd
->
payloadLen
>
0
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -705,8 +705,9 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
STableDataBlocks
*
pOneTableBlock
=
pTableDataBlockList
->
pData
[
i
];
STableDataBlocks
*
dataBuf
=
NULL
;
int32_t
ret
=
tscGetDataBlockFromList
(
pVnodeDataBlockHashList
,
pVnodeDataBlockList
,
pOneTableBlock
->
vgid
,
TSDB_PAYLOAD_SIZE
,
int32_t
ret
=
tscGetDataBlockFromList
(
pVnodeDataBlockHashList
,
pVnodeDataBlockList
,
pOneTableBlock
->
vgId
,
TSDB_PAYLOAD_SIZE
,
tsInsertHeadSize
,
0
,
pOneTableBlock
->
tableId
,
pOneTableBlock
->
pTableMeta
,
&
dataBuf
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p failed to prepare the data block buffer for merging table data, code:%d"
,
pSql
,
ret
);
...
...
src/dnode/src/dnodeWrite.c
浏览文件 @
a7040bed
...
...
@@ -275,7 +275,9 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
pRsp
->
numOfRows
=
htonl
(
1
);
pRsp
->
affectedRows
=
htonl
(
1
);
pRsp
->
numOfFailedBlocks
=
0
;
// todo write to tsdb
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
pCont
=
pRsp
,
...
...
src/inc/taosmsg.h
浏览文件 @
a7040bed
...
...
@@ -198,10 +198,20 @@ typedef struct {
}
SShellSubmitBlock
;
typedef
struct
{
int32_t
numOfVnodes
;
}
SMsgDesc
;
typedef
struct
SMsgHead
{
int32_t
contLen
;
int32_t
vgId
;
}
SMsgHead
;
typedef
struct
{
SMsgDesc
desc
;
SMsgHead
header
;
int16_t
import
;
int16_t
vnode
;
int32_t
numOfSid
;
/* total number of sid */
char
blks
[];
/* numOfSid blocks, each blocks for one table */
int32_t
numOfTables
;
// total number of sid
char
blks
[];
// number of data blocks, each table has at least one data block
}
SShellSubmitMsg
;
typedef
struct
{
...
...
@@ -232,15 +242,6 @@ typedef struct {
uint32_t
ip
;
}
SVnodeDesc
;
typedef
struct
{
int32_t
numOfVnodes
;
}
SMsgDesc
;
typedef
struct
{
int32_t
contLen
;
int32_t
vgId
;
}
SMsgHead
;
typedef
struct
{
int32_t
contLen
;
int32_t
vgId
;
...
...
@@ -688,7 +689,7 @@ typedef struct STableMetaMsg {
int8_t
numOfVpeers
;
SVnodeDesc
vpeerDesc
[
TSDB_VNODES_SUPPORT
];
int32_t
sid
;
int32_t
vg
i
d
;
int32_t
vg
I
d
;
uint64_t
uid
;
SSchema
schema
[];
}
STableMetaMsg
;
...
...
src/mnode/src/mgmtChildTable.c
浏览文件 @
a7040bed
...
...
@@ -445,7 +445,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
int32_t
mgmtGetChildTableMeta
(
SDbObj
*
pDb
,
SChildTableObj
*
pTable
,
STableMetaMsg
*
pMeta
,
bool
usePublicIp
)
{
pMeta
->
uid
=
htobe64
(
pTable
->
uid
);
pMeta
->
sid
=
htonl
(
pTable
->
sid
);
pMeta
->
vg
i
d
=
htonl
(
pTable
->
vgId
);
pMeta
->
vg
I
d
=
htonl
(
pTable
->
vgId
);
pMeta
->
sversion
=
htons
(
pTable
->
superTable
->
sversion
);
pMeta
->
precision
=
pDb
->
cfg
.
precision
;
pMeta
->
numOfTags
=
pTable
->
superTable
->
numOfTags
;
...
...
src/mnode/src/mgmtNormalTable.c
浏览文件 @
a7040bed
...
...
@@ -524,7 +524,7 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SNormalTableObj *p
int32_t
mgmtGetNormalTableMeta
(
SDbObj
*
pDb
,
SNormalTableObj
*
pTable
,
STableMetaMsg
*
pMeta
,
bool
usePublicIp
)
{
pMeta
->
uid
=
htobe64
(
pTable
->
uid
);
pMeta
->
sid
=
htonl
(
pTable
->
sid
);
pMeta
->
vg
i
d
=
htonl
(
pTable
->
vgId
);
pMeta
->
vg
I
d
=
htonl
(
pTable
->
vgId
);
pMeta
->
sversion
=
htons
(
pTable
->
sversion
);
pMeta
->
precision
=
pDb
->
cfg
.
precision
;
pMeta
->
numOfTags
=
0
;
...
...
src/mnode/src/mgmtSuperTable.c
浏览文件 @
a7040bed
...
...
@@ -654,7 +654,7 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
int32_t
mgmtGetSuperTableMeta
(
SDbObj
*
pDb
,
SSuperTableObj
*
pTable
,
STableMetaMsg
*
pMeta
,
bool
usePublicIp
)
{
pMeta
->
uid
=
htobe64
(
pTable
->
uid
);
pMeta
->
sid
=
htonl
(
pTable
->
sid
);
pMeta
->
vg
i
d
=
htonl
(
pTable
->
vgId
);
pMeta
->
vg
I
d
=
htonl
(
pTable
->
vgId
);
pMeta
->
sversion
=
htons
(
pTable
->
sversion
);
pMeta
->
precision
=
pDb
->
cfg
.
precision
;
pMeta
->
numOfTags
=
pTable
->
numOfTags
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录