Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3ace0b15
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3ace0b15
编写于
8月 03, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(tmq): race condition
上级
e345023d
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
94 addition
and
92 deletion
+94
-92
source/client/src/tmq.c
source/client/src/tmq.c
+94
-92
未找到文件。
source/client/src/tmq.c
浏览文件 @
3ace0b15
...
...
@@ -504,15 +504,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pMsgSendInfo
->
requestId
=
generateRequestId
();
pMsgSendInfo
->
requestObjRefId
=
0
;
pMsgSendInfo
->
param
=
pParam
;
pMsgSendInfo
->
paramFreeFp
=
taosMemoryFree
;
pMsgSendInfo
->
paramFreeFp
=
taosMemoryFree
;
pMsgSendInfo
->
fp
=
tmqCommitCb2
;
pMsgSendInfo
->
msgType
=
TDMT_VND_MQ_COMMIT_OFFSET
;
// send msg
atomic_add_fetch_32
(
&
pParamSet
->
waitingRspNum
,
1
);
atomic_add_fetch_32
(
&
pParamSet
->
totalRspNum
,
1
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
pMsgSendInfo
);
pParamSet
->
waitingRspNum
++
;
pParamSet
->
totalRspNum
++
;
return
0
;
}
...
...
@@ -2196,7 +2197,7 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray*
cJSON
*
tvalue
=
NULL
;
if
(
IS_VAR_DATA_TYPE
(
pTagVal
->
type
))
{
char
*
buf
=
taosMemoryCalloc
(
pTagVal
->
nData
+
3
,
1
);
if
(
!
buf
)
goto
end
;
if
(
!
buf
)
goto
end
;
dataConverToStr
(
buf
,
pTagVal
->
type
,
pTagVal
->
pData
,
pTagVal
->
nData
,
NULL
);
tvalue
=
cJSON_CreateString
(
buf
);
taosMemoryFree
(
buf
);
...
...
@@ -2506,8 +2507,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
launchQueryImpl
(
pRequest
,
&
pQuery
,
true
,
NULL
);
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
)
{
SCatalog
*
pCatalog
=
NULL
;
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
)
{
SCatalog
*
pCatalog
=
NULL
;
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
catalogRemoveTableMeta
(
pCatalog
,
&
tableName
);
}
...
...
@@ -2575,8 +2576,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
launchQueryImpl
(
pRequest
,
&
pQuery
,
true
,
NULL
);
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
)
{
SCatalog
*
pCatalog
=
NULL
;
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
)
{
SCatalog
*
pCatalog
=
NULL
;
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
catalogRemoveTableMeta
(
pCatalog
,
&
tableName
);
}
...
...
@@ -2695,7 +2696,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
}
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
){
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
)
{
removeMeta
(
pTscObj
,
pRequest
->
tableList
);
}
...
...
@@ -2812,7 +2813,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
}
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
){
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
)
{
removeMeta
(
pTscObj
,
pRequest
->
tableList
);
}
code
=
pRequest
->
code
;
...
...
@@ -2827,7 +2828,7 @@ end:
// delete from db.tabl where .. -> delete from tabl where ..
// delete from db .tabl where .. -> delete from tabl where ..
//static void getTbName(char *sql){
//
static void getTbName(char *sql){
// char *ch = sql;
//
// bool inBackQuote = false;
...
...
@@ -2858,9 +2859,9 @@ end:
//}
static
int32_t
taosDeleteData
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SDeleteRes
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SDeleteRes
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
// decode and process req
void
*
data
=
POINTER_SHIFT
(
meta
,
sizeof
(
SMsgHead
));
...
...
@@ -2871,13 +2872,14 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
goto
end
;
}
// getTbName(req.tableFName);
// getTbName(req.tableFName);
char
sql
[
256
]
=
{
0
};
sprintf
(
sql
,
"delete from `%s` where `%s` >= %"
PRId64
" and `%s` <= %"
PRId64
,
req
.
tableFName
,
req
.
tsColName
,
req
.
skey
,
req
.
tsColName
,
req
.
ekey
);
sprintf
(
sql
,
"delete from `%s` where `%s` >= %"
PRId64
" and `%s` <= %"
PRId64
,
req
.
tableFName
,
req
.
tsColName
,
req
.
skey
,
req
.
tsColName
,
req
.
ekey
);
printf
(
"delete sql:%s
\n
"
,
sql
);
TAOS_RES
*
res
=
taos_query
(
taos
,
sql
);
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
TAOS_RES
*
res
=
taos_query
(
taos
,
sql
);
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
code
=
pRequest
->
code
;
if
(
code
==
TSDB_CODE_PAR_TABLE_NOT_EXIST
)
{
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -2985,9 +2987,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
code
=
TSDB_CODE_SUCCESS
;
}
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
code
==
TSDB_CODE_SUCCESS
)
{
SExecResult
*
pRes
=
&
pRequest
->
body
.
resInfo
.
execRes
;
if
(
pRes
->
res
!=
NULL
)
{
if
(
pRes
->
res
!=
NULL
)
{
code
=
handleAlterTbExecRes
(
pRes
->
res
,
pCatalog
);
}
}
...
...
@@ -3001,23 +3003,23 @@ end:
return
code
;
}
typedef
struct
{
typedef
struct
{
SVgroupInfo
vg
;
void
*
data
;
}
VgData
;
void
*
data
;
}
VgData
;
static
void
destroyVgHash
(
void
*
data
)
{
VgData
*
vgData
=
(
VgData
*
)
data
;
taosMemoryFreeClear
(
vgData
->
data
);
}
int
taos_write_raw_block
(
TAOS
*
taos
,
int
rows
,
char
*
pData
,
const
char
*
tbname
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int
taos_write_raw_block
(
TAOS
*
taos
,
int
rows
,
char
*
pData
,
const
char
*
tbname
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
STableMeta
*
pTableMeta
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
pRequest
)
{
if
(
!
pRequest
)
{
uError
(
"WriteRaw:createRequest error request is null"
);
code
=
terrno
;
goto
end
;
...
...
@@ -3033,9 +3035,9 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
strcpy
(
pName
.
dbname
,
pRequest
->
pDb
);
strcpy
(
pName
.
tname
,
tbname
);
struct
SCatalog
*
pCatalog
=
NULL
;
struct
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw: get gatlog error"
);
goto
end
;
}
...
...
@@ -3060,17 +3062,17 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
}
uint64_t
suid
=
(
TSDB_NORMAL_TABLE
==
pTableMeta
->
tableType
?
0
:
pTableMeta
->
suid
);
uint64_t
uid
=
pTableMeta
->
uid
;
int32_t
numOfCols
=
pTableMeta
->
tableInfo
.
numOfColumns
;
int32_t
numOfCols
=
pTableMeta
->
tableInfo
.
numOfColumns
;
uint16_t
fLen
=
0
;
int32_t
rowSize
=
0
;
int16_t
nVar
=
0
;
int32_t
rowSize
=
0
;
int16_t
nVar
=
0
;
for
(
int
i
=
0
;
i
<
numOfCols
;
i
++
)
{
SSchema
*
schema
=
pTableMeta
->
schema
+
i
;
SSchema
*
schema
=
pTableMeta
->
schema
+
i
;
fLen
+=
TYPE_BYTES
[
schema
->
type
];
rowSize
+=
schema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
schema
->
type
))
{
nVar
++
;
if
(
IS_VAR_DATA_TYPE
(
schema
->
type
))
{
nVar
++
;
}
}
...
...
@@ -3079,22 +3081,22 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
int32_t
schemaLen
=
0
;
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
int32_t
totalLen
=
sizeof
(
SSubmitReq
)
+
submitLen
;
int32_t
totalLen
=
sizeof
(
SSubmitReq
)
+
submitLen
;
SSubmitReq
*
subReq
=
taosMemoryCalloc
(
1
,
totalLen
);
SSubmitBlk
*
blk
=
POINTER_SHIFT
(
subReq
,
sizeof
(
SSubmitReq
));
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
SRowBuilder
rb
=
{
0
};
tdSRowInit
(
&
rb
,
pTableMeta
->
sversion
);
tdSRowSetTpInfo
(
&
rb
,
numOfCols
,
fLen
);
int32_t
dataLen
=
0
;
char
*
pStart
=
pData
+
sizeof
(
int32_t
)
+
sizeof
(
uint64_t
)
+
numOfCols
*
(
sizeof
(
int16_t
)
+
sizeof
(
int32_t
));
char
*
pStart
=
pData
+
sizeof
(
int32_t
)
+
sizeof
(
uint64_t
)
+
numOfCols
*
(
sizeof
(
int16_t
)
+
sizeof
(
int32_t
));
int32_t
*
colLength
=
(
int32_t
*
)
pStart
;
pStart
+=
sizeof
(
int32_t
)
*
numOfCols
;
SResultColumn
*
pCol
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
SResultColumn
));
SResultColumn
*
pCol
=
taosMemoryCalloc
(
numOfCols
,
sizeof
(
SResultColumn
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
if
(
IS_VAR_DATA_TYPE
(
pTableMeta
->
schema
[
i
].
type
))
{
...
...
@@ -3113,7 +3115,7 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
tdSRowResetBuf
(
&
rb
,
rowData
);
int32_t
offset
=
0
;
for
(
int32_t
k
=
0
;
k
<
numOfCols
;
k
++
)
{
const
SSchema
*
pColumn
=
&
pTableMeta
->
schema
[
k
];
const
SSchema
*
pColumn
=
&
pTableMeta
->
schema
[
k
];
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
type
))
{
if
(
pCol
[
k
].
offset
[
j
]
!=
-
1
)
{
...
...
@@ -3159,17 +3161,17 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
pQuery
->
haveResultSet
=
false
;
pQuery
->
msgType
=
TDMT_VND_SUBMIT
;
pQuery
->
pRoot
=
(
SNode
*
)
nodesMakeNode
(
QUERY_NODE_VNODE_MODIF_STMT
);
pQuery
->
pRoot
=
(
SNode
*
)
nodesMakeNode
(
QUERY_NODE_VNODE_MODIF_STMT
);
if
(
NULL
==
pQuery
->
pRoot
)
{
uError
(
"create pQuery->pRoot error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
SVnodeModifOpStmt
*
nodeStmt
=
(
SVnodeModifOpStmt
*
)(
pQuery
->
pRoot
);
SVnodeModifOpStmt
*
nodeStmt
=
(
SVnodeModifOpStmt
*
)(
pQuery
->
pRoot
);
nodeStmt
->
payloadType
=
PAYLOAD_TYPE_KV
;
nodeStmt
->
pDataBlocks
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SVgDataBlocks
*
dst
=
taosMemoryCalloc
(
1
,
sizeof
(
SVgDataBlocks
));
SVgDataBlocks
*
dst
=
taosMemoryCalloc
(
1
,
sizeof
(
SVgDataBlocks
));
if
(
NULL
==
dst
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
end
;
...
...
@@ -3183,7 +3185,7 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
subReq
->
header
.
contLen
=
htonl
(
subReq
->
length
);
subReq
->
length
=
htonl
(
subReq
->
length
);
subReq
->
numOfBlocks
=
htonl
(
subReq
->
numOfBlocks
);
subReq
=
NULL
;
// no need free
subReq
=
NULL
;
// no need free
taosArrayPush
(
nodeStmt
->
pDataBlocks
,
&
dst
);
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
...
...
@@ -3195,16 +3197,16 @@ end:
return
code
;
}
static
int32_t
tmqWriteRaw
(
TAOS
*
taos
,
void
*
data
,
int32_t
dataLen
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SHashObj
*
pVgHash
=
NULL
;
SQuery
*
pQuery
=
NULL
;
static
int32_t
tmqWriteRaw
(
TAOS
*
taos
,
void
*
data
,
int32_t
dataLen
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SHashObj
*
pVgHash
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SMqRspObj
rspObj
=
{
0
};
SDecoder
decoder
=
{
0
};
SDecoder
decoder
=
{
0
};
terrno
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
pRequest
)
{
if
(
!
pRequest
)
{
uError
(
"WriteRaw:createRequest error request is null"
);
return
terrno
;
}
...
...
@@ -3214,7 +3216,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
tDecoderInit
(
&
decoder
,
data
,
dataLen
);
code
=
tDecodeSMqDataRsp
(
&
decoder
,
&
rspObj
.
rsp
);
if
(
code
!=
0
){
if
(
code
!=
0
)
{
uError
(
"WriteRaw:decode smqDataRsp error"
);
code
=
TSDB_CODE_INVALID_MSG
;
goto
end
;
...
...
@@ -3228,9 +3230,9 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
pVgHash
=
taosHashInit
(
16
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
taosHashSetFreeFp
(
pVgHash
,
destroyVgHash
);
struct
SCatalog
*
pCatalog
=
NULL
;
struct
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw: get gatlog error"
);
goto
end
;
}
...
...
@@ -3252,20 +3254,20 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
setResSchemaInfo
(
&
rspObj
.
resInfo
,
pSW
->
pSchema
,
pSW
->
nCols
);
code
=
setQueryResultFromRsp
(
&
rspObj
.
resInfo
,
pRetrieve
,
false
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"WriteRaw: setQueryResultFromRsp error"
);
goto
end
;
}
uint16_t
fLen
=
0
;
int32_t
rowSize
=
0
;
int16_t
nVar
=
0
;
int32_t
rowSize
=
0
;
int16_t
nVar
=
0
;
for
(
int
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
SSchema
*
schema
=
pSW
->
pSchema
+
i
;
SSchema
*
schema
=
pSW
->
pSchema
+
i
;
fLen
+=
TYPE_BYTES
[
schema
->
type
];
rowSize
+=
schema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
schema
->
type
))
{
nVar
++
;
if
(
IS_VAR_DATA_TYPE
(
schema
->
type
))
{
nVar
++
;
}
}
...
...
@@ -3276,7 +3278,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
const
char
*
tbName
=
(
const
char
*
)
taosArrayGetP
(
rspObj
.
rsp
.
blockTbName
,
rspObj
.
resIter
);
if
(
!
tbName
)
{
if
(
!
tbName
)
{
uError
(
"WriteRaw: tbname is null"
);
code
=
TSDB_CODE_TMQ_INVALID_MSG
;
goto
end
;
...
...
@@ -3296,12 +3298,12 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
SSubmitReq
*
subReq
=
NULL
;
SSubmitBlk
*
blk
=
NULL
;
void
*
hData
=
taosHashGet
(
pVgHash
,
&
vgData
.
vg
.
vgId
,
sizeof
(
vgData
.
vg
.
vgId
));
if
(
hData
)
{
void
*
hData
=
taosHashGet
(
pVgHash
,
&
vgData
.
vg
.
vgId
,
sizeof
(
vgData
.
vg
.
vgId
));
if
(
hData
)
{
vgData
=
*
(
VgData
*
)
hData
;
int32_t
totalLen
=
((
SSubmitReq
*
)(
vgData
.
data
))
->
length
+
submitLen
;
void
*
tmp
=
taosMemoryRealloc
(
vgData
.
data
,
totalLen
);
void
*
tmp
=
taosMemoryRealloc
(
vgData
.
data
,
totalLen
);
if
(
tmp
==
NULL
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
end
;
...
...
@@ -3310,15 +3312,15 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
((
VgData
*
)
hData
)
->
data
=
tmp
;
subReq
=
(
SSubmitReq
*
)(
vgData
.
data
);
blk
=
POINTER_SHIFT
(
vgData
.
data
,
subReq
->
length
);
}
else
{
}
else
{
int32_t
totalLen
=
sizeof
(
SSubmitReq
)
+
submitLen
;
void
*
tmp
=
taosMemoryCalloc
(
1
,
totalLen
);
void
*
tmp
=
taosMemoryCalloc
(
1
,
totalLen
);
if
(
tmp
==
NULL
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
end
;
}
vgData
.
data
=
tmp
;
taosHashPut
(
pVgHash
,
(
const
char
*
)
&
vgData
.
vg
.
vgId
,
sizeof
(
vgData
.
vg
.
vgId
),
(
char
*
)
&
vgData
,
sizeof
(
vgData
));
taosHashPut
(
pVgHash
,
(
const
char
*
)
&
vgData
.
vg
.
vgId
,
sizeof
(
vgData
.
vg
.
vgId
),
(
char
*
)
&
vgData
,
sizeof
(
vgData
));
subReq
=
(
SSubmitReq
*
)(
vgData
.
data
);
subReq
->
length
=
sizeof
(
SSubmitReq
);
subReq
->
numOfBlocks
=
0
;
...
...
@@ -3336,7 +3338,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
uint64_t
uid
=
pTableMeta
->
uid
;
taosMemoryFreeClear
(
pTableMeta
);
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
SRowBuilder
rb
=
{
0
};
...
...
@@ -3352,12 +3354,12 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
int32_t
offset
=
0
;
for
(
int32_t
k
=
0
;
k
<
pSW
->
nCols
;
k
++
)
{
const
SSchema
*
pColumn
=
&
pSW
->
pSchema
[
k
];
char
*
data
=
rspObj
.
resInfo
.
row
[
k
];
const
SSchema
*
pColumn
=
&
pSW
->
pSchema
[
k
];
char
*
data
=
rspObj
.
resInfo
.
row
[
k
];
if
(
!
data
)
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NULL
,
NULL
,
false
,
offset
,
k
);
}
else
{
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
type
))
{
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
type
))
{
data
-=
VARSTR_HEADER_SIZE
;
}
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
data
,
true
,
offset
,
k
);
...
...
@@ -3389,21 +3391,21 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
pQuery
->
haveResultSet
=
false
;
pQuery
->
msgType
=
TDMT_VND_SUBMIT
;
pQuery
->
pRoot
=
(
SNode
*
)
nodesMakeNode
(
QUERY_NODE_VNODE_MODIF_STMT
);
pQuery
->
pRoot
=
(
SNode
*
)
nodesMakeNode
(
QUERY_NODE_VNODE_MODIF_STMT
);
if
(
NULL
==
pQuery
->
pRoot
)
{
uError
(
"create pQuery->pRoot error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
SVnodeModifOpStmt
*
nodeStmt
=
(
SVnodeModifOpStmt
*
)(
pQuery
->
pRoot
);
SVnodeModifOpStmt
*
nodeStmt
=
(
SVnodeModifOpStmt
*
)(
pQuery
->
pRoot
);
nodeStmt
->
payloadType
=
PAYLOAD_TYPE_KV
;
int32_t
numOfVg
=
taosHashGetSize
(
pVgHash
);
nodeStmt
->
pDataBlocks
=
taosArrayInit
(
numOfVg
,
POINTER_BYTES
);
VgData
*
vData
=
(
VgData
*
)
taosHashIterate
(
pVgHash
,
NULL
);
VgData
*
vData
=
(
VgData
*
)
taosHashIterate
(
pVgHash
,
NULL
);
while
(
vData
)
{
SVgDataBlocks
*
dst
=
taosMemoryCalloc
(
1
,
sizeof
(
SVgDataBlocks
));
SVgDataBlocks
*
dst
=
taosMemoryCalloc
(
1
,
sizeof
(
SVgDataBlocks
));
if
(
NULL
==
dst
)
{
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
end
;
...
...
@@ -3413,14 +3415,14 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
dst
->
numOfTables
=
subReq
->
numOfBlocks
;
dst
->
size
=
subReq
->
length
;
dst
->
pData
=
(
char
*
)
subReq
;
vData
->
data
=
NULL
;
// no need free
vData
->
data
=
NULL
;
// no need free
subReq
->
header
.
vgId
=
htonl
(
dst
->
vg
.
vgId
);
subReq
->
version
=
htonl
(
1
);
subReq
->
header
.
contLen
=
htonl
(
subReq
->
length
);
subReq
->
length
=
htonl
(
subReq
->
length
);
subReq
->
numOfBlocks
=
htonl
(
subReq
->
numOfBlocks
);
taosArrayPush
(
nodeStmt
->
pDataBlocks
,
&
dst
);
vData
=
(
VgData
*
)
taosHashIterate
(
pVgHash
,
vData
);
vData
=
(
VgData
*
)
taosHashIterate
(
pVgHash
,
vData
);
}
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
...
...
@@ -3459,8 +3461,8 @@ char* tmq_get_json_meta(TAOS_RES* res) {
void
tmq_free_json_meta
(
char
*
jsonMeta
)
{
taosMemoryFreeClear
(
jsonMeta
);
}
int32_t
tmq_get_raw
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw
)
{
if
(
!
raw
||
!
res
){
int32_t
tmq_get_raw
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw
)
{
if
(
!
raw
||
!
res
)
{
return
TSDB_CODE_INVALID_PARA
;
}
if
(
TD_RES_TMQ_META
(
res
))
{
...
...
@@ -3468,8 +3470,8 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
raw
->
raw
=
pMetaRspObj
->
metaRsp
.
metaRsp
;
raw
->
raw_len
=
pMetaRspObj
->
metaRsp
.
metaRspLen
;
raw
->
raw_type
=
pMetaRspObj
->
metaRsp
.
resMsgType
;
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
rspObj
=
((
SMqRspObj
*
)
res
);
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
rspObj
=
((
SMqRspObj
*
)
res
);
int32_t
len
=
0
;
int32_t
code
=
0
;
...
...
@@ -3478,7 +3480,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
return
-
1
;
}
void
*
buf
=
taosMemoryCalloc
(
1
,
len
);
void
*
buf
=
taosMemoryCalloc
(
1
,
len
);
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
len
);
tEncodeSMqDataRsp
(
&
encoder
,
&
rspObj
->
rsp
);
...
...
@@ -3494,31 +3496,31 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
}
void
tmq_free_raw
(
tmq_raw_data
raw
)
{
if
(
raw
.
raw_type
==
RES_TYPE__TMQ
){
if
(
raw
.
raw_type
==
RES_TYPE__TMQ
)
{
taosMemoryFree
(
raw
.
raw
);
}
}
int32_t
tmq_write_raw
(
TAOS
*
taos
,
tmq_raw_data
raw
)
{
int32_t
tmq_write_raw
(
TAOS
*
taos
,
tmq_raw_data
raw
)
{
if
(
!
taos
)
{
return
TSDB_CODE_INVALID_PARA
;
}
if
(
raw
.
raw_type
==
TDMT_VND_CREATE_STB
)
{
if
(
raw
.
raw_type
==
TDMT_VND_CREATE_STB
)
{
return
taosCreateStb
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_ALTER_STB
)
{
}
else
if
(
raw
.
raw_type
==
TDMT_VND_ALTER_STB
)
{
return
taosCreateStb
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_DROP_STB
)
{
}
else
if
(
raw
.
raw_type
==
TDMT_VND_DROP_STB
)
{
return
taosDropStb
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_CREATE_TABLE
)
{
}
else
if
(
raw
.
raw_type
==
TDMT_VND_CREATE_TABLE
)
{
return
taosCreateTable
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_ALTER_TABLE
)
{
}
else
if
(
raw
.
raw_type
==
TDMT_VND_ALTER_TABLE
)
{
return
taosAlterTable
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_DROP_TABLE
)
{
}
else
if
(
raw
.
raw_type
==
TDMT_VND_DROP_TABLE
)
{
return
taosDropTable
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
TDMT_VND_DELETE
)
{
}
else
if
(
raw
.
raw_type
==
TDMT_VND_DELETE
)
{
return
taosDeleteData
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
else
if
(
raw
.
raw_type
==
RES_TYPE__TMQ
)
{
}
else
if
(
raw
.
raw_type
==
RES_TYPE__TMQ
)
{
return
tmqWriteRaw
(
taos
,
raw
.
raw
,
raw
.
raw_len
);
}
return
TSDB_CODE_INVALID_PARA
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录