Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
fc3b4785
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
fc3b4785
编写于
4月 12, 2022
作者:
L
Liu Jicong
提交者:
GitHub
4月 12, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11409 from taosdata/feature/tq
refactor: TAOS_RES process
上级
5a70bddb
cf33a822
变更
8
显示空白变更内容
内联
并排
Showing
8 changed file
with
272 addition
and
204 deletion
+272
-204
example/src/tmq.c
example/src/tmq.c
+3
-2
include/common/tdatablock.h
include/common/tdatablock.h
+8
-3
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+72
-33
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+43
-44
source/client/src/clientMain.c
source/client/src/clientMain.c
+131
-77
source/client/src/tmq.c
source/client/src/tmq.c
+13
-42
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+1
-3
未找到文件。
example/src/tmq.c
浏览文件 @
fc3b4785
...
...
@@ -163,12 +163,13 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
printf
(
"subscribe err
\n
"
);
return
;
}
/*int32_t cnt = 0;*/
int32_t
cnt
=
0
;
/*clock_t startTime = clock();*/
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
500
);
if
(
tmqmessage
)
{
/*cnt++;*/
cnt
++
;
printf
(
"get data
\n
"
);
msg_process
(
tmqmessage
);
tmq_message_destroy
(
tmqmessage
);
/*} else {*/
...
...
include/common/tdatablock.h
浏览文件 @
fc3b4785
...
...
@@ -136,7 +136,8 @@ static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uin
}
static
FORCE_INLINE
void
colDataAppendInt16
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
int16_t
*
v
)
{
ASSERT
(
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_SMALLINT
||
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_USMALLINT
);
ASSERT
(
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_SMALLINT
||
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_USMALLINT
);
char
*
p
=
pColumnInfoData
->
pData
+
pColumnInfoData
->
info
.
bytes
*
currentRow
;
*
(
int16_t
*
)
p
=
*
(
int16_t
*
)
v
;
}
...
...
@@ -210,6 +211,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
void
blockDebugShowData
(
const
SArray
*
dataBlocks
);
static
FORCE_INLINE
int32_t
blockEstimateEncodeSize
(
const
SSDataBlock
*
pBlock
)
{
return
blockDataGetSerialMetaSize
(
pBlock
)
+
(
int32_t
)
ceil
(
blockDataGetSerialRowSize
(
pBlock
)
*
pBlock
->
info
.
rows
);
}
static
FORCE_INLINE
int32_t
blockCompressColData
(
SColumnInfoData
*
pColRes
,
int32_t
numOfRows
,
char
*
data
,
int8_t
compressed
)
{
int32_t
colSize
=
colDataGetLength
(
pColRes
,
numOfRows
);
...
...
source/client/inc/clientInt.h
浏览文件 @
fc3b4785
...
...
@@ -186,7 +186,24 @@ typedef struct SRequestSendRecvBody {
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
enum
{
RES_TYPE__QUERY
=
1
,
RES_TYPE__TMQ
,
};
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
typedef
struct
SMqRspObj
{
int8_t
resType
;
char
*
topic
;
void
*
vg
;
SArray
*
res
;
// SArray<SReqResultInfo>
int32_t
resIter
;
}
SMqRspObj
;
typedef
struct
SRequestObj
{
int8_t
resType
;
// query or tmq
uint64_t
requestId
;
int32_t
type
;
// request type
STscObj
*
pTscObj
;
...
...
@@ -203,6 +220,25 @@ typedef struct SRequestObj {
SRequestSendRecvBody
body
;
}
SRequestObj
;
static
FORCE_INLINE
SReqResultInfo
*
tmqGetCurResInfo
(
TAOS_RES
*
res
)
{
SMqRspObj
*
msg
=
(
SMqRspObj
*
)
res
;
int32_t
resIter
=
msg
->
resIter
==
-
1
?
0
:
msg
->
resIter
;
return
(
SReqResultInfo
*
)
taosArrayGet
(
msg
->
res
,
resIter
);
}
static
FORCE_INLINE
SReqResultInfo
*
tmqGetNextResInfo
(
TAOS_RES
*
res
)
{
SMqRspObj
*
msg
=
(
SMqRspObj
*
)
res
;
if
(
++
msg
->
resIter
<
taosArrayGetSize
(
msg
->
res
))
{
return
(
SReqResultInfo
*
)
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
}
return
NULL
;
}
static
FORCE_INLINE
SReqResultInfo
*
tscGetCurResInfo
(
TAOS_RES
*
res
)
{
if
(
TD_RES_QUERY
(
res
))
return
&
(((
SRequestObj
*
)
res
)
->
body
.
resInfo
);
return
tmqGetCurResInfo
(
res
);
}
extern
SAppInfo
appInfo
;
extern
int32_t
clientReqRefPool
;
extern
int32_t
clientConnRefPool
;
...
...
@@ -238,14 +274,17 @@ void initMsgHandleFp();
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
void
*
doFetchRow
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
);
int32_t
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
,
bool
convertUcs4
);
int32_t
parseSql
(
SRequestObj
*
pRequest
,
bool
topicQuery
,
SQuery
**
pQuery
);
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
);
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
);
int32_t
parseSql
(
SRequestObj
*
pRequest
,
bool
topicQuery
,
SQuery
**
pQuery
);
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
);
void
*
doFetchRow
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
);
void
doSetOneRowPtr
(
SReqResultInfo
*
pResultInfo
);
int32_t
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
,
bool
convertUcs4
);
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
int32_t
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
,
bool
convertUcs4
);
// --- heartbeat
// global, called by mgmt
...
...
source/client/src/clientEnv.c
浏览文件 @
fc3b4785
...
...
@@ -149,6 +149,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
return
NULL
;
}
pRequest
->
resType
=
RES_TYPE__QUERY
;
pRequest
->
pDb
=
getDbOfConnection
(
pObj
);
pRequest
->
requestId
=
generateRequestId
();
pRequest
->
metric
.
start
=
taosGetTimestampUs
();
...
...
source/client/src/clientImpl.c
浏览文件 @
fc3b4785
...
...
@@ -13,7 +13,6 @@
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
);
static
int32_t
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
,
bool
convertUcs4
);
static
bool
stringLengthCheck
(
const
char
*
str
,
size_t
maxsize
)
{
if
(
str
==
NULL
)
{
...
...
@@ -42,7 +41,6 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
static
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
);
static
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
...
...
@@ -211,13 +209,11 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
)
{
pRequest
->
type
=
pQuery
->
msgType
;
SPlanContext
cxt
=
{
.
queryId
=
pRequest
->
requestId
,
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
};
.
showRewrite
=
pQuery
->
showRewrite
};
int32_t
code
=
qCreateQueryPlan
(
&
cxt
,
pPlan
,
pNodeList
);
if
(
code
!=
0
)
{
return
code
;
...
...
@@ -254,7 +250,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
void
*
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
...
...
@@ -320,7 +317,7 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
}
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
)
{
SCatalog
*
pCatalog
=
NULL
;
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
0
;
int32_t
dbNum
=
taosArrayGetSize
(
pRequest
->
dbList
);
int32_t
tblNum
=
taosArrayGetSize
(
pRequest
->
tableList
);
...
...
@@ -337,7 +334,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
SEpSet
epset
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
for
(
int32_t
i
=
0
;
i
<
dbNum
;
++
i
)
{
char
*
dbFName
=
taosArrayGet
(
pRequest
->
dbList
,
i
);
char
*
dbFName
=
taosArrayGet
(
pRequest
->
dbList
,
i
);
code
=
catalogRefreshDBVgInfo
(
pCatalog
,
pTscObj
->
pAppInfo
->
pTransporter
,
&
epset
,
dbFName
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -346,7 +343,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
}
for
(
int32_t
i
=
0
;
i
<
tblNum
;
++
i
)
{
SName
*
tableName
=
taosArrayGet
(
pRequest
->
tableList
,
i
);
SName
*
tableName
=
taosArrayGet
(
pRequest
->
tableList
,
i
);
code
=
catalogRefreshTableMeta
(
pCatalog
,
pTscObj
->
pAppInfo
->
pTransporter
,
&
epset
,
tableName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -357,7 +354,6 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
return
code
;
}
SRequestObj
*
execQuery
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
)
{
SRequestObj
*
pRequest
=
NULL
;
int32_t
retryNum
=
0
;
...
...
@@ -509,7 +505,8 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
}
bool
persistConnForSpecificMsg
(
void
*
parenct
,
tmsg_t
msgType
)
{
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
||
msgType
==
TDMT_VND_QUERY_HEARTBEAT_RSP
;
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
||
msgType
==
TDMT_VND_QUERY_HEARTBEAT_RSP
;
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
...
...
@@ -536,10 +533,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int32_t
elapsed
=
pRequest
->
metric
.
rsp
-
pRequest
->
metric
.
start
;
if
(
pMsg
->
code
==
TSDB_CODE_SUCCESS
)
{
tscDebug
(
"0x%"
PRIx64
" message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
}
else
{
tscError
(
"0x%"
PRIx64
" SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
}
taosReleaseRef
(
clientReqRefPool
,
pSendInfo
->
requestObjRefId
);
...
...
@@ -590,7 +587,7 @@ TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, c
return
taos_connect
(
ipStr
,
userStr
,
passStr
,
dbStr
,
port
);
}
static
void
doSetOneRowPtr
(
SReqResultInfo
*
pResultInfo
)
{
void
doSetOneRowPtr
(
SReqResultInfo
*
pResultInfo
)
{
for
(
int32_t
i
=
0
;
i
<
pResultInfo
->
numOfCols
;
++
i
)
{
SResultColumn
*
pCol
=
&
pResultInfo
->
pCol
[
i
];
...
...
@@ -770,7 +767,8 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
return
TSDB_CODE_SUCCESS
;
}
int32_t
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
,
bool
convertUcs4
)
{
int32_t
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
,
bool
convertUcs4
)
{
assert
(
numOfCols
>
0
&&
pFields
!=
NULL
&&
pResultInfo
!=
NULL
);
if
(
numOfRows
==
0
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -851,5 +849,6 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
// TODO handle the compressed case
pResultInfo
->
totalRows
+=
pResultInfo
->
numOfRows
;
return
setResultDataPtr
(
pResultInfo
,
pResultInfo
->
fields
,
pResultInfo
->
numOfCols
,
pResultInfo
->
numOfRows
,
convertUcs4
);
return
setResultDataPtr
(
pResultInfo
,
pResultInfo
->
fields
,
pResultInfo
->
numOfCols
,
pResultInfo
->
numOfRows
,
convertUcs4
);
}
source/client/src/clientMain.c
浏览文件 @
fc3b4785
...
...
@@ -133,8 +133,7 @@ int taos_field_count(TAOS_RES *res) {
return
0
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
SReqResultInfo
*
pResInfo
=
tscGetCurResInfo
(
res
);
return
pResInfo
->
numOfCols
;
}
...
...
@@ -145,7 +144,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return
NULL
;
}
SReqResultInfo
*
pResInfo
=
&
(((
SRequestObj
*
)
res
)
->
body
.
resInfo
);
SReqResultInfo
*
pResInfo
=
tscGetCurResInfo
(
res
);
return
pResInfo
->
userFields
;
}
...
...
@@ -162,6 +161,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return
NULL
;
}
if
(
TD_RES_QUERY
(
res
))
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
...
...
@@ -169,6 +169,28 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
return
doFetchRow
(
pRequest
,
true
,
true
);
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
SReqResultInfo
*
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
if
(
pResultInfo
->
row
==
NULL
)
{
msg
->
resIter
++
;
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
}
return
pResultInfo
->
row
;
}
else
{
// assert to avoid uninitialization error
ASSERT
(
0
);
}
return
NULL
;
}
int
taos_print_row
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
)
{
...
...
@@ -260,12 +282,12 @@ int *taos_fetch_lengths(TAOS_RES *res) {
return
NULL
;
}
return
((
SRequestObj
*
)
res
)
->
body
.
resInfo
.
length
;
SReqResultInfo
*
pResInfo
=
tscGetCurResInfo
(
res
);
return
pResInfo
->
length
;
}
TAOS_ROW
*
taos_result_block
(
TAOS_RES
*
res
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
if
(
res
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
...
...
@@ -274,7 +296,8 @@ TAOS_ROW *taos_result_block(TAOS_RES *res) {
return
NULL
;
}
return
&
pRequest
->
body
.
resInfo
.
row
;
SReqResultInfo
*
pResInfo
=
tscGetCurResInfo
(
res
);
return
&
pResInfo
->
row
;
}
// todo intergrate with tDataTypes
...
...
@@ -313,7 +336,7 @@ const char *taos_data_type(int type) {
const
char
*
taos_get_client_info
()
{
return
version
;
}
int
taos_affected_rows
(
TAOS_RES
*
res
)
{
if
(
res
==
NULL
)
{
if
(
res
==
NULL
||
TD_RES_TMQ
(
res
)
)
{
return
0
;
}
...
...
@@ -323,12 +346,17 @@ int taos_affected_rows(TAOS_RES *res) {
}
int
taos_result_precision
(
TAOS_RES
*
res
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
if
(
res
==
NULL
)
{
return
TSDB_TIME_PRECISION_MILLI
;
}
if
(
TD_RES_QUERY
(
res
))
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
return
pRequest
->
body
.
resInfo
.
precision
;
}
else
if
(
TD_RES_TMQ
(
res
))
{
SReqResultInfo
*
info
=
tmqGetCurResInfo
(
res
);
return
info
->
precision
;
}
return
TSDB_TIME_PRECISION_MILLI
;
}
int
taos_select_db
(
TAOS
*
taos
,
const
char
*
db
)
{
...
...
@@ -370,31 +398,29 @@ void taos_stop_query(TAOS_RES *res) {
}
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
)
{
SRequestObj
*
pRequestObj
=
res
;
SReqResultInfo
*
pResultInfo
=
&
pRequestObj
->
body
.
resInfo
;
SReqResultInfo
*
pResultInfo
=
tscGetCurResInfo
(
res
);
if
(
col
>=
pResultInfo
->
numOfCols
||
col
<
0
||
row
>=
pResultInfo
->
numOfRows
||
row
<
0
)
{
return
true
;
}
SResultColumn
*
pCol
=
&
pRe
questObj
->
body
.
resInfo
.
pCol
[
col
];
SResultColumn
*
pCol
=
&
pRe
sultInfo
->
pCol
[
col
];
return
colDataIsNull_f
(
pCol
->
nullbitmap
,
row
);
}
bool
taos_is_update_query
(
TAOS_RES
*
res
)
{
return
taos_num_fields
(
res
)
==
0
;
}
bool
taos_is_update_query
(
TAOS_RES
*
res
)
{
return
taos_num_fields
(
res
)
==
0
;
}
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
)
{
int32_t
numOfRows
=
0
;
/*int32_t code = */
taos_fetch_block_s
(
res
,
&
numOfRows
,
rows
);
/*int32_t code = */
taos_fetch_block_s
(
res
,
&
numOfRows
,
rows
);
return
numOfRows
;
}
int
taos_fetch_block_s
(
TAOS_RES
*
res
,
int
*
numOfRows
,
TAOS_ROW
*
rows
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
int
taos_fetch_block_s
(
TAOS_RES
*
res
,
int
*
numOfRows
,
TAOS_ROW
*
rows
)
{
if
(
res
==
NULL
)
{
return
0
;
}
if
(
TD_RES_QUERY
(
res
))
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
(
*
rows
)
=
NULL
;
(
*
numOfRows
)
=
0
;
...
...
@@ -413,13 +439,26 @@ int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) {
(
*
rows
)
=
pResultInfo
->
row
;
(
*
numOfRows
)
=
pResultInfo
->
numOfRows
;
return
pRequest
->
code
;
}
else
if
(
TD_RES_TMQ
(
res
))
{
SReqResultInfo
*
pResultInfo
=
tmqGetNextResInfo
(
res
);
if
(
pResultInfo
==
NULL
)
return
-
1
;
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
(
*
rows
)
=
pResultInfo
->
row
;
(
*
numOfRows
)
=
pResultInfo
->
numOfRows
;
return
0
;
}
else
{
ASSERT
(
0
);
return
-
1
;
}
}
int
taos_fetch_raw_block
(
TAOS_RES
*
res
,
int
*
numOfRows
,
void
**
pData
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
int
taos_fetch_raw_block
(
TAOS_RES
*
res
,
int
*
numOfRows
,
void
**
pData
)
{
if
(
res
==
NULL
)
{
return
0
;
}
if
(
TD_RES_QUERY
(
res
))
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
...
...
@@ -432,28 +471,42 @@ int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) {
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
(
*
numOfRows
)
=
pResultInfo
->
numOfRows
;
(
*
pData
)
=
(
void
*
)
pResultInfo
->
pData
;
(
*
pData
)
=
(
void
*
)
pResultInfo
->
pData
;
return
0
;
}
else
if
(
TD_RES_TMQ
(
res
))
{
SReqResultInfo
*
pResultInfo
=
tmqGetNextResInfo
(
res
);
if
(
pResultInfo
==
NULL
)
return
-
1
;
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
(
*
numOfRows
)
=
pResultInfo
->
numOfRows
;
(
*
pData
)
=
(
void
*
)
pResultInfo
->
pData
;
return
0
;
}
else
{
ASSERT
(
0
);
return
-
1
;
}
}
int
*
taos_get_column_data_offset
(
TAOS_RES
*
res
,
int
columnIndex
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
if
(
res
==
NULL
)
{
return
0
;
}
int32_t
numOfFields
=
taos_num_fields
(
pRequest
);
int32_t
numOfFields
=
taos_num_fields
(
res
);
if
(
columnIndex
<
0
||
columnIndex
>=
numOfFields
||
numOfFields
==
0
)
{
return
0
;
}
TAOS_FIELD
*
pField
=
&
pRequest
->
body
.
resInfo
.
userFields
[
columnIndex
];
SReqResultInfo
*
pResInfo
=
tscGetCurResInfo
(
res
);
TAOS_FIELD
*
pField
=
&
pResInfo
->
userFields
[
columnIndex
];
if
(
!
IS_VAR_DATA_TYPE
(
pField
->
type
))
{
return
0
;
}
return
pRe
quest
->
body
.
resInfo
.
pCol
[
columnIndex
].
offset
;
return
pRe
sInfo
->
pCol
[
columnIndex
].
offset
;
}
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
)
{
return
true
;
}
...
...
@@ -483,7 +536,8 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
// TODO
}
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
int
restart
,
const
char
*
topic
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
)
{
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
int
restart
,
const
char
*
topic
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
)
{
// TODO
return
NULL
;
}
...
...
@@ -562,7 +616,7 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
return
-
1
;
}
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
)
{
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
}
...
...
@@ -572,7 +626,7 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
return
NULL
;
}
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
)
{
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
)
{
// TODO
return
-
1
;
}
source/client/src/tmq.c
浏览文件 @
fc3b4785
...
...
@@ -17,25 +17,19 @@
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tqueue.h"
#include "tref.h"
typedef
struct
{
int32_t
curBlock
;
int32_t
curRow
;
void
**
uData
;
}
SMqRowIter
;
struct
tmq_message_t
{
SMqPollRsp
msg
;
char
*
topic
;
void
*
vg
;
SMqRowIter
iter
;
SArray
*
res
;
// SArray<SReqResultInfo>
int32_t
resIter
;
};
struct
tmq_list_t
{
...
...
@@ -849,7 +843,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
msgEpoch
<
tmqEpoch
)
{
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
/*tsem_post(&tmq->rspSem);*/
tscWarn
(
"msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
tscWarn
(
"msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
return
0
;
}
...
...
@@ -886,8 +881,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
memcpy
(
pRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqPollRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRsp
->
msg
);
pRsp
->
iter
.
curBlock
=
0
;
pRsp
->
iter
.
curRow
=
0
;
/*pRsp->iter.curBlock = 0;*/
/*pRsp->iter.curRow = 0;*/
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
...
...
@@ -899,8 +894,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
#endif
tscDebug
(
"consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld"
,
tmq
->
consumerId
,
pParam
->
pVg
->
vgId
,
pRsp
->
msg
.
reqOffset
,
pRsp
->
msg
.
rspOffset
);
tscDebug
(
"consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld"
,
tmq
->
consumerId
,
pParam
->
pVg
->
vgId
,
pRsp
->
msg
.
r
eqOffset
,
pRsp
->
msg
.
r
spOffset
);
pRsp
->
vg
=
pParam
->
pVg
;
taosWriteQitem
(
tmq
->
mqueue
,
pRsp
);
...
...
@@ -921,7 +916,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
bool
set
=
false
;
int32_t
topicNumGet
=
taosArrayGetSize
(
pRsp
->
topics
);
char
vgKey
[
TSDB_TOPIC_FNAME_LEN
+
22
];
tscDebug
(
"consumer %ld update ep epoch %d to epoch %d, topic num: %d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
);
tscDebug
(
"consumer %ld update ep epoch %d to epoch %d, topic num: %d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
);
SArray
*
newTopics
=
taosArrayInit
(
topicNumGet
,
sizeof
(
SMqClientTopic
));
if
(
newTopics
==
NULL
)
{
return
false
;
...
...
@@ -1289,7 +1285,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t
transporterId
=
0
;
/*printf("send poll\n");*/
atomic_add_fetch_32
(
&
tmq
->
waitingRequest
,
1
);
tscDebug
(
"consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
tmq
->
epoch
,
pVg
->
currentOffset
,
pReq
->
reqId
);
tscDebug
(
"consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
tmq
->
epoch
,
pVg
->
currentOffset
,
pReq
->
reqId
);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
pVg
->
pollCnt
++
;
...
...
@@ -1566,30 +1563,4 @@ const char* tmq_err2str(tmq_resp_err_t err) {
return
"fail"
;
}
TAOS_ROW
tmq_get_row
(
tmq_message_t
*
message
)
{
SMqPollRsp
*
rsp
=
&
message
->
msg
;
while
(
1
)
{
if
(
message
->
iter
.
curBlock
<
taosArrayGetSize
(
rsp
->
pBlockData
))
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
rsp
->
pBlockData
,
message
->
iter
.
curBlock
);
if
(
message
->
iter
.
curRow
<
pBlock
->
info
.
rows
)
{
for
(
int
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
i
++
)
{
SColumnInfoData
*
pData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
if
(
colDataIsNull_s
(
pData
,
message
->
iter
.
curRow
))
message
->
iter
.
uData
[
i
]
=
NULL
;
else
{
message
->
iter
.
uData
[
i
]
=
colDataGetData
(
pData
,
message
->
iter
.
curRow
);
}
}
message
->
iter
.
curRow
++
;
return
message
->
iter
.
uData
;
}
else
{
message
->
iter
.
curBlock
++
;
message
->
iter
.
curRow
=
0
;
continue
;
}
}
return
NULL
;
}
}
char
*
tmq_get_topic_name
(
tmq_message_t
*
message
)
{
return
"not implemented yet"
;
}
source/libs/executor/src/dataDispatcher.c
浏览文件 @
fc3b4785
...
...
@@ -92,9 +92,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return
false
;
}
// NOTE: there are four bytes of an integer more than the required buffer space.
// struct size + data payload + length for each column + bitmap length
pBuf
->
allocSize
=
sizeof
(
SRetrieveTableRsp
)
+
blockDataGetSerialMetaSize
(
pInput
->
pData
)
+
blockDataGetSize
(
pInput
->
pData
);
pBuf
->
allocSize
=
sizeof
(
SRetrieveTableRsp
)
+
blockEstimateEncodeSize
(
pInput
->
pData
);
pBuf
->
pData
=
taosMemoryMalloc
(
pBuf
->
allocSize
);
if
(
pBuf
->
pData
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录