Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
dceb51d4
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dceb51d4
编写于
4月 27, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
print rid instead of pointer
上级
a3b40170
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
125 addition
and
126 deletion
+125
-126
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+4
-4
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+1
-1
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+8
-8
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+3
-3
src/client/src/tscProfile.c
src/client/src/tscProfile.c
+1
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+1
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+39
-39
src/client/src/tscSql.c
src/client/src/tscSql.c
+6
-7
src/client/src/tscStream.c
src/client/src/tscStream.c
+12
-12
src/client/src/tscSub.c
src/client/src/tscSub.c
+2
-2
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+41
-41
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+7
-7
未找到文件。
src/client/src/tscAsync.c
浏览文件 @
dceb51d4
...
@@ -49,7 +49,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
...
@@ -49,7 +49,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql
->
sqlstr
=
calloc
(
1
,
sqlLen
+
1
);
pSql
->
sqlstr
=
calloc
(
1
,
sqlLen
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"
%p failed to malloc sql string buffer"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc sql string buffer"
,
pSql
->
self
);
pSql
->
res
.
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
pSql
->
res
.
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscAsyncResultOnError
(
pSql
);
tscAsyncResultOnError
(
pSql
);
return
;
return
;
...
@@ -80,7 +80,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
...
@@ -80,7 +80,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
TAOS_RES
*
taos_query_ra
(
TAOS
*
taos
,
const
char
*
sqlstr
,
__async_cb_func_t
fp
,
void
*
param
)
{
TAOS_RES
*
taos_query_ra
(
TAOS
*
taos
,
const
char
*
sqlstr
,
__async_cb_func_t
fp
,
void
*
param
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
tscError
(
"
bug!!! pObj:%p
"
,
pObj
);
tscError
(
"
pObj:%p is NULL or freed
"
,
pObj
);
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_TSC_DISCONNECTED
);
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_TSC_DISCONNECTED
);
return
NULL
;
return
NULL
;
...
@@ -288,7 +288,7 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
...
@@ -288,7 +288,7 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
}
}
assert
(
pSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
);
assert
(
pSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
);
tscError
(
"
%p invoke user specified function due to error occurred, code:%s"
,
pSql
,
tstrerror
(
pSql
->
res
.
code
));
tscError
(
"
0x%"
PRIx64
" async result callback, code:%s"
,
pSql
->
self
,
tstrerror
(
pSql
->
res
.
code
));
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pSql
->
fp
==
NULL
||
pSql
->
fetchFp
==
NULL
){
if
(
pSql
->
fp
==
NULL
||
pSql
->
fetchFp
==
NULL
){
...
@@ -368,7 +368,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
...
@@ -368,7 +368,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlObj
*
sub
=
(
SSqlObj
*
)
res
;
SSqlObj
*
sub
=
(
SSqlObj
*
)
res
;
const
char
*
msg
=
(
sub
->
cmd
.
command
==
TSDB_SQL_STABLEVGROUP
)
?
"vgroup-list"
:
"table-meta"
;
const
char
*
msg
=
(
sub
->
cmd
.
command
==
TSDB_SQL_STABLEVGROUP
)
?
"vgroup-list"
:
"table-meta"
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p get %s failed, code:%s"
,
pSql
,
msg
,
tstrerror
(
code
));
tscError
(
"
0x%"
PRIx64
" get %s failed, code:%s"
,
pSql
->
self
,
msg
,
tstrerror
(
code
));
goto
_error
;
goto
_error
;
}
}
...
...
src/client/src/tscLocal.c
浏览文件 @
dceb51d4
...
@@ -926,7 +926,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
...
@@ -926,7 +926,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pRes
->
code
=
tscProcessServStatus
(
pSql
);
pRes
->
code
=
tscProcessServStatus
(
pSql
);
}
else
{
}
else
{
pRes
->
code
=
TSDB_CODE_TSC_INVALID_SQL
;
pRes
->
code
=
TSDB_CODE_TSC_INVALID_SQL
;
tscError
(
"
%p not support command:%d"
,
pSql
,
pCmd
->
command
);
tscError
(
"
0x%"
PRIx64
" not support command:%d"
,
pSql
->
self
,
pCmd
->
command
);
}
}
// keep the code in local variable in order to avoid invalid read in case of async query
// keep the code in local variable in order to avoid invalid read in case of async query
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
dceb51d4
...
@@ -177,14 +177,14 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
...
@@ -177,14 +177,14 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
if
(
pMemBuffer
==
NULL
)
{
if
(
pMemBuffer
==
NULL
)
{
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
pFFModel
,
numOfBuffer
);
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
pFFModel
,
numOfBuffer
);
tscError
(
"
%p pMemBuffer
is NULL"
,
pMemBuffer
);
tscError
(
"
pMemBuffer:%p
is NULL"
,
pMemBuffer
);
pRes
->
code
=
TSDB_CODE_TSC_APP_ERROR
;
pRes
->
code
=
TSDB_CODE_TSC_APP_ERROR
;
return
;
return
;
}
}
if
(
pDesc
->
pColumnModel
==
NULL
)
{
if
(
pDesc
->
pColumnModel
==
NULL
)
{
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
pFFModel
,
numOfBuffer
);
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
pFFModel
,
numOfBuffer
);
tscError
(
"
%p no local buffer or intermediate result format model"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" no local buffer or intermediate result format model"
,
pSql
->
self
);
pRes
->
code
=
TSDB_CODE_TSC_APP_ERROR
;
pRes
->
code
=
TSDB_CODE_TSC_APP_ERROR
;
return
;
return
;
}
}
...
@@ -208,7 +208,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
...
@@ -208,7 +208,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
}
}
if
(
pDesc
->
pColumnModel
->
capacity
>=
pMemBuffer
[
0
]
->
pageSize
)
{
if
(
pDesc
->
pColumnModel
->
capacity
>=
pMemBuffer
[
0
]
->
pageSize
)
{
tscError
(
"
%p Invalid value of buffer capacity %d and page size %d "
,
pSql
,
pDesc
->
pColumnModel
->
capacity
,
tscError
(
"
0x%"
PRIx64
" Invalid value of buffer capacity %d and page size %d "
,
pSql
->
self
,
pDesc
->
pColumnModel
->
capacity
,
pMemBuffer
[
0
]
->
pageSize
);
pMemBuffer
[
0
]
->
pageSize
);
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
pFFModel
,
numOfBuffer
);
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
pFFModel
,
numOfBuffer
);
...
@@ -220,7 +220,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
...
@@ -220,7 +220,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
SLocalMerger
*
pReducer
=
(
SLocalMerger
*
)
calloc
(
1
,
size
);
SLocalMerger
*
pReducer
=
(
SLocalMerger
*
)
calloc
(
1
,
size
);
if
(
pReducer
==
NULL
)
{
if
(
pReducer
==
NULL
)
{
tscError
(
"
%p failed to create local merge structure, out of memory"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to create local merge structure, out of memory"
,
pSql
->
self
);
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
pFFModel
,
numOfBuffer
);
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
pFFModel
,
numOfBuffer
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
@@ -244,7 +244,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
...
@@ -244,7 +244,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
for
(
int32_t
j
=
0
;
j
<
numOfFlushoutInFile
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
numOfFlushoutInFile
;
++
j
)
{
SLocalDataSource
*
ds
=
(
SLocalDataSource
*
)
malloc
(
sizeof
(
SLocalDataSource
)
+
pMemBuffer
[
0
]
->
pageSize
);
SLocalDataSource
*
ds
=
(
SLocalDataSource
*
)
malloc
(
sizeof
(
SLocalDataSource
)
+
pMemBuffer
[
0
]
->
pageSize
);
if
(
ds
==
NULL
)
{
if
(
ds
==
NULL
)
{
tscError
(
"
%p failed to create merge structure"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to create merge structure"
,
pSql
->
self
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tfree
(
pReducer
);
tfree
(
pReducer
);
return
;
return
;
...
@@ -674,7 +674,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
...
@@ -674,7 +674,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
(
*
pMemBuffer
)
=
(
tExtMemBuffer
**
)
malloc
(
POINTER_BYTES
*
pSql
->
subState
.
numOfSub
);
(
*
pMemBuffer
)
=
(
tExtMemBuffer
**
)
malloc
(
POINTER_BYTES
*
pSql
->
subState
.
numOfSub
);
if
(
*
pMemBuffer
==
NULL
)
{
if
(
*
pMemBuffer
==
NULL
)
{
tscError
(
"
%p failed to allocate memory"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to allocate memory"
,
pSql
->
self
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
pRes
->
code
;
return
pRes
->
code
;
}
}
...
@@ -683,7 +683,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
...
@@ -683,7 +683,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pSchema
=
(
SSchema
*
)
calloc
(
1
,
sizeof
(
SSchema
)
*
size
);
pSchema
=
(
SSchema
*
)
calloc
(
1
,
sizeof
(
SSchema
)
*
size
);
if
(
pSchema
==
NULL
)
{
if
(
pSchema
==
NULL
)
{
tscError
(
"
%p failed to allocate memory"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to allocate memory"
,
pSql
->
self
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
pRes
->
code
;
return
pRes
->
code
;
}
}
...
@@ -1529,7 +1529,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
...
@@ -1529,7 +1529,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
return
pRes
->
code
;
return
pRes
->
code
;
}
}
tscError
(
"
%p local merge abort due to error occurs, code:%s"
,
pSql
,
tstrerror
(
pRes
->
code
));
tscError
(
"
0x%"
PRIx64
" local merge abort due to error occurs, code:%s"
,
pSql
->
self
,
tstrerror
(
pRes
->
code
));
return
pRes
->
code
;
return
pRes
->
code
;
}
}
...
...
src/client/src/tscParseInsert.c
浏览文件 @
dceb51d4
...
@@ -1141,7 +1141,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
...
@@ -1141,7 +1141,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
return
code
;
return
code
;
}
}
tscError
(
"
%p async insert parse error, code:%s"
,
pSql
,
tstrerror
(
code
));
tscError
(
"
0x%"
PRIx64
" async insert parse error, code:%s"
,
pSql
->
self
,
tstrerror
(
code
));
pCmd
->
curSql
=
NULL
;
pCmd
->
curSql
=
NULL
;
goto
_clean
;
goto
_clean
;
}
}
...
@@ -1409,7 +1409,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
...
@@ -1409,7 +1409,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
assert
(
pSql
->
res
.
numOfRows
==
0
);
assert
(
pSql
->
res
.
numOfRows
==
0
);
int32_t
ret
=
fseek
(
fp
,
0
,
SEEK_SET
);
int32_t
ret
=
fseek
(
fp
,
0
,
SEEK_SET
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
tscError
(
"
%p failed to seek SEEK_SET since:%s"
,
pSql
,
tstrerror
(
errno
));
tscError
(
"
0x%"
PRIx64
" failed to seek SEEK_SET since:%s"
,
pSql
->
self
,
tstrerror
(
errno
));
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_error
;
goto
_error
;
}
}
...
@@ -1529,7 +1529,7 @@ void tscImportDataFromFile(SSqlObj *pSql) {
...
@@ -1529,7 +1529,7 @@ void tscImportDataFromFile(SSqlObj *pSql) {
FILE
*
fp
=
fopen
(
pCmd
->
payload
,
"rb"
);
FILE
*
fp
=
fopen
(
pCmd
->
payload
,
"rb"
);
if
(
fp
==
NULL
)
{
if
(
fp
==
NULL
)
{
pSql
->
res
.
code
=
TAOS_SYSTEM_ERROR
(
errno
);
pSql
->
res
.
code
=
TAOS_SYSTEM_ERROR
(
errno
);
tscError
(
"
%p failed to open file %s to load data from file, code:%s"
,
pSql
,
pCmd
->
payload
,
tstrerror
(
pSql
->
res
.
code
));
tscError
(
"
0x%"
PRIx64
" failed to open file %s to load data from file, code:%s"
,
pSql
->
self
,
pCmd
->
payload
,
tstrerror
(
pSql
->
res
.
code
));
tfree
(
pSupporter
);
tfree
(
pSupporter
);
taos_free_result
(
pNew
);
taos_free_result
(
pNew
);
...
...
src/client/src/tscProfile.c
浏览文件 @
dceb51d4
...
@@ -104,7 +104,7 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
...
@@ -104,7 +104,7 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
char
*
sql
=
malloc
(
sqlSize
);
char
*
sql
=
malloc
(
sqlSize
);
if
(
sql
==
NULL
)
{
if
(
sql
==
NULL
)
{
tscError
(
"
%p failed to allocate memory to sent slow query to dnode"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to allocate memory to sent slow query to dnode"
,
pSql
->
self
);
return
;
return
;
}
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
dceb51d4
...
@@ -5187,7 +5187,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
...
@@ -5187,7 +5187,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
int32_t
size
=
sizeof
(
SUpdateTableTagValMsg
)
+
pTagsSchema
->
bytes
+
schemaLen
+
TSDB_EXTRA_PAYLOAD_SIZE
;
int32_t
size
=
sizeof
(
SUpdateTableTagValMsg
)
+
pTagsSchema
->
bytes
+
schemaLen
+
TSDB_EXTRA_PAYLOAD_SIZE
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
tscError
(
"
%p failed to malloc for alter table msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for alter table msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
...
src/client/src/tscServer.c
浏览文件 @
dceb51d4
...
@@ -221,7 +221,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
...
@@ -221,7 +221,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
assert
(
online
<=
total
);
assert
(
online
<=
total
);
if
(
online
<
total
)
{
if
(
online
<
total
)
{
tscError
(
"
HB:%p, total dnode:%d, online dnode:%d"
,
pSql
,
total
,
online
);
tscError
(
"
0x%"
PRIx64
", HB, total dnode:%d, online dnode:%d"
,
pSql
->
self
,
total
,
online
);
pSql
->
res
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
pSql
->
res
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
}
}
...
@@ -273,7 +273,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
...
@@ -273,7 +273,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
taosReleaseRef
(
tscObjRef
,
pObj
->
hbrid
);
taosReleaseRef
(
tscObjRef
,
pObj
->
hbrid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p failed to sent HB to server, reason:%s"
,
pHB
,
tstrerror
(
code
));
tscError
(
"
0x%"
PRIx64
" failed to sent HB to server, reason:%s"
,
pHB
->
self
,
tstrerror
(
code
));
}
}
taosReleaseRef
(
tscRefId
,
rid
);
taosReleaseRef
(
tscRefId
,
rid
);
...
@@ -285,7 +285,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
...
@@ -285,7 +285,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
char
*
pMsg
=
rpcMallocCont
(
pCmd
->
payloadLen
);
char
*
pMsg
=
rpcMallocCont
(
pCmd
->
payloadLen
);
if
(
NULL
==
pMsg
)
{
if
(
NULL
==
pMsg
)
{
tscError
(
"
%p msg:%s malloc failed"
,
pSql
,
taosMsg
[
pSql
->
cmd
.
msgType
]);
tscError
(
"
0x%"
PRIx64
" msg:%s malloc failed"
,
pSql
->
self
,
taosMsg
[
pSql
->
cmd
.
msgType
]);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -369,11 +369,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
...
@@ -369,11 +369,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
rpcMsg
->
code
==
TSDB_CODE_APP_NOT_READY
))
{
rpcMsg
->
code
==
TSDB_CODE_APP_NOT_READY
))
{
pSql
->
retry
++
;
pSql
->
retry
++
;
tscWarn
(
"
%p it shall renew table meta, code:%s, retry:%d"
,
pSql
,
tstrerror
(
rpcMsg
->
code
),
pSql
->
retry
);
tscWarn
(
"
0x%"
PRIx64
" it shall renew table meta, code:%s, retry:%d"
,
pSql
->
self
,
tstrerror
(
rpcMsg
->
code
),
pSql
->
retry
);
pSql
->
res
.
code
=
rpcMsg
->
code
;
// keep the previous error code
pSql
->
res
.
code
=
rpcMsg
->
code
;
// keep the previous error code
if
(
pSql
->
retry
>
pSql
->
maxRetry
)
{
if
(
pSql
->
retry
>
pSql
->
maxRetry
)
{
tscError
(
"
%p max retry %d reached, give up"
,
pSql
,
pSql
->
maxRetry
);
tscError
(
"
0x%"
PRIx64
" max retry %d reached, give up"
,
pSql
->
self
,
pSql
->
maxRetry
);
}
else
{
}
else
{
// wait for a little bit moment and then retry
// wait for a little bit moment and then retry
// todo do not sleep in rpc callback thread, add this process into queueu to process
// todo do not sleep in rpc callback thread, add this process into queueu to process
...
@@ -664,7 +664,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
...
@@ -664,7 +664,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
assert
(
index
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
assert
(
index
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
pVgroupInfo
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
index
];
pVgroupInfo
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
index
];
}
else
{
}
else
{
tscError
(
"
%p No vgroup info found"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" No vgroup info found"
,
pSql
->
self
);
*
succeed
=
0
;
*
succeed
=
0
;
return
pMsg
;
return
pMsg
;
...
@@ -733,7 +733,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -733,7 +733,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t
size
=
tscEstimateQueryMsgSize
(
pSql
,
pCmd
->
clauseIndex
);
int32_t
size
=
tscEstimateQueryMsgSize
(
pSql
,
pCmd
->
clauseIndex
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_INVALID_SQL
;
// todo add test for this
return
TSDB_CODE_TSC_INVALID_SQL
;
// todo add test for this
}
}
...
@@ -743,19 +743,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -743,19 +743,19 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
size_t
numOfSrcCols
=
taosArrayGetSize
(
pQueryInfo
->
colList
);
size_t
numOfSrcCols
=
taosArrayGetSize
(
pQueryInfo
->
colList
);
if
(
numOfSrcCols
<=
0
&&
!
tscQueryTags
(
pQueryInfo
)
&&
!
tscQueryBlockInfo
(
pQueryInfo
))
{
if
(
numOfSrcCols
<=
0
&&
!
tscQueryTags
(
pQueryInfo
)
&&
!
tscQueryBlockInfo
(
pQueryInfo
))
{
tscError
(
"
%p illegal value of numOfCols in query msg: %"
PRIu64
", table cols:%d"
,
pSql
,
(
uint64_t
)
numOfSrcCols
,
tscError
(
"
0x%"
PRIx64
" illegal value of numOfCols in query msg: %"
PRIu64
", table cols:%d"
,
pSql
->
self
,
(
uint64_t
)
numOfSrcCols
,
tscGetNumOfColumns
(
pTableMeta
));
tscGetNumOfColumns
(
pTableMeta
));
return
TSDB_CODE_TSC_INVALID_SQL
;
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
if
(
pQueryInfo
->
interval
.
interval
<
0
)
{
if
(
pQueryInfo
->
interval
.
interval
<
0
)
{
tscError
(
"
%p illegal value of aggregation time interval in query msg: %"
PRId64
,
pSql
,
(
int64_t
)
pQueryInfo
->
interval
.
interval
);
tscError
(
"
0x%"
PRIx64
" illegal value of aggregation time interval in query msg: %"
PRId64
,
pSql
->
self
,
(
int64_t
)
pQueryInfo
->
interval
.
interval
);
return
TSDB_CODE_TSC_INVALID_SQL
;
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
if
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
<
0
)
{
if
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
<
0
)
{
tscError
(
"
%p illegal value of numOfGroupCols in query msg: %d"
,
pSql
,
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
);
tscError
(
"
0x%"
PRIx64
" illegal value of numOfGroupCols in query msg: %d"
,
pSql
->
self
,
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
);
return
TSDB_CODE_TSC_INVALID_SQL
;
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
...
@@ -813,8 +813,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -813,8 +813,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
n
);
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
n
);
tscError
(
"
%p
tid:%d uid:%"
PRIu64
" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s"
,
tscError
(
"
0x%"
PRIx64
"
tid:%d uid:%"
PRIu64
" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s"
,
pSql
,
pTableMeta
->
id
.
tid
,
pTableMeta
->
id
.
uid
,
n
,
tscGetNumOfColumns
(
pTableMeta
),
pCol
->
colIndex
.
columnIndex
,
pSql
->
self
,
pTableMeta
->
id
.
tid
,
pTableMeta
->
id
.
uid
,
n
,
tscGetNumOfColumns
(
pTableMeta
),
pCol
->
colIndex
.
columnIndex
,
pColSchema
->
name
);
pColSchema
->
name
);
return
TSDB_CODE_TSC_INVALID_SQL
;
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
...
@@ -859,12 +859,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -859,12 +859,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// the queried table has been removed and a new table with the same name has already been created already
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
// return error msg
if
(
pExpr
->
uid
!=
pTableMeta
->
id
.
uid
)
{
if
(
pExpr
->
uid
!=
pTableMeta
->
id
.
uid
)
{
tscError
(
"
%p table has already been destroyed"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" table has already been destroyed"
,
pSql
->
self
);
return
TSDB_CODE_TSC_INVALID_TABLE_NAME
;
return
TSDB_CODE_TSC_INVALID_TABLE_NAME
;
}
}
if
(
!
tscValidateColumnId
(
pTableMetaInfo
,
pExpr
->
colInfo
.
colId
,
pExpr
->
numOfParams
))
{
if
(
!
tscValidateColumnId
(
pTableMetaInfo
,
pExpr
->
colInfo
.
colId
,
pExpr
->
numOfParams
))
{
tscError
(
"
%p table schema is not matched with parsed sql"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" table schema is not matched with parsed sql"
,
pSql
->
self
);
return
TSDB_CODE_TSC_INVALID_SQL
;
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
...
@@ -965,12 +965,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -965,12 +965,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// the queried table has been removed and a new table with the same name has already been created already
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
// return error msg
if
(
pExpr
->
uid
!=
pTableMeta
->
id
.
uid
)
{
if
(
pExpr
->
uid
!=
pTableMeta
->
id
.
uid
)
{
tscError
(
"
%p table has already been destroyed"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" table has already been destroyed"
,
pSql
->
self
);
return
TSDB_CODE_TSC_INVALID_TABLE_NAME
;
return
TSDB_CODE_TSC_INVALID_TABLE_NAME
;
}
}
if
(
!
tscValidateColumnId
(
pTableMetaInfo
,
pExpr
->
colInfo
.
colId
,
pExpr
->
numOfParams
))
{
if
(
!
tscValidateColumnId
(
pTableMetaInfo
,
pExpr
->
colInfo
.
colId
,
pExpr
->
numOfParams
))
{
tscError
(
"
%p table schema is not matched with parsed sql"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" table schema is not matched with parsed sql"
,
pSql
->
self
);
return
TSDB_CODE_TSC_INVALID_SQL
;
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
...
@@ -1076,8 +1076,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1076,8 +1076,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char
n
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
char
n
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
n
);
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
n
);
tscError
(
"
%p
tid:%d uid:%"
PRIu64
" id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s"
,
tscError
(
"
0x%"
PRIx64
"
tid:%d uid:%"
PRIu64
" id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s"
,
pSql
,
pTableMeta
->
id
.
tid
,
pTableMeta
->
id
.
uid
,
n
,
total
,
numOfTagColumns
,
pCol
->
colIndex
.
columnIndex
,
pColSchema
->
name
);
pSql
->
self
,
pTableMeta
->
id
.
tid
,
pTableMeta
->
id
.
uid
,
n
,
total
,
numOfTagColumns
,
pCol
->
colIndex
.
columnIndex
,
pColSchema
->
name
);
return
TSDB_CODE_TSC_INVALID_SQL
;
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
...
@@ -1170,7 +1170,7 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1170,7 +1170,7 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
payloadLen
=
sizeof
(
SCreateDnodeMsg
);
pCmd
->
payloadLen
=
sizeof
(
SCreateDnodeMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1188,7 +1188,7 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1188,7 +1188,7 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
payloadLen
=
sizeof
(
SCreateAcctMsg
);
pCmd
->
payloadLen
=
sizeof
(
SCreateAcctMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1234,7 +1234,7 @@ int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1234,7 +1234,7 @@ int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SCreateUserMsg
);
pCmd
->
payloadLen
=
sizeof
(
SCreateUserMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1273,7 +1273,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1273,7 +1273,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SDropDbMsg
);
pCmd
->
payloadLen
=
sizeof
(
SDropDbMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1295,7 +1295,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1295,7 +1295,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SCMDropTableMsg
);
pCmd
->
payloadLen
=
sizeof
(
SCMDropTableMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1316,7 +1316,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1316,7 +1316,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SDropDnodeMsg
);
pCmd
->
payloadLen
=
sizeof
(
SDropDnodeMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1337,7 +1337,7 @@ int32_t tscBuildDropUserAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1337,7 +1337,7 @@ int32_t tscBuildDropUserAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
msgType
=
(
pInfo
->
type
==
TSDB_SQL_DROP_USER
)
?
TSDB_MSG_TYPE_CM_DROP_USER
:
TSDB_MSG_TYPE_CM_DROP_ACCT
;
pCmd
->
msgType
=
(
pInfo
->
type
==
TSDB_SQL_DROP_USER
)
?
TSDB_MSG_TYPE_CM_DROP_USER
:
TSDB_MSG_TYPE_CM_DROP_ACCT
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1352,7 +1352,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1352,7 +1352,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SUseDbMsg
);
pCmd
->
payloadLen
=
sizeof
(
SUseDbMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1369,7 +1369,7 @@ int32_t tscBuildSyncDbReplicaMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
...
@@ -1369,7 +1369,7 @@ int32_t tscBuildSyncDbReplicaMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SSyncDbMsg
);
pCmd
->
payloadLen
=
sizeof
(
SSyncDbMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1388,7 +1388,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1388,7 +1388,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SShowMsg
)
+
100
;
pCmd
->
payloadLen
=
sizeof
(
SShowMsg
)
+
100
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1474,7 +1474,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1474,7 +1474,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// Reallocate the payload size
// Reallocate the payload size
size
=
tscEstimateCreateTableMsgLength
(
pSql
,
pInfo
);
size
=
tscEstimateCreateTableMsgLength
(
pSql
,
pInfo
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
tscError
(
"
%p failed to malloc for create table msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for create table msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1573,7 +1573,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1573,7 +1573,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SAlterTableInfo
*
pAlterInfo
=
pInfo
->
pAlterInfo
;
SAlterTableInfo
*
pAlterInfo
=
pInfo
->
pAlterInfo
;
int
size
=
tscEstimateAlterTableMsgLength
(
pCmd
);
int
size
=
tscEstimateAlterTableMsgLength
(
pCmd
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
tscError
(
"
%p failed to malloc for alter table msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for alter table msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1647,7 +1647,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1647,7 +1647,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SRetrieveTableMsg
);
pCmd
->
payloadLen
=
sizeof
(
SRetrieveTableMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1753,7 +1753,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1753,7 +1753,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd
->
payloadLen
=
sizeof
(
SConnectMsg
);
pCmd
->
payloadLen
=
sizeof
(
SConnectMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"
%p failed to malloc for query msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc for query msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1893,7 +1893,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...
@@ -1893,7 +1893,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int
size
=
numOfQueries
*
sizeof
(
SQueryDesc
)
+
numOfStreams
*
sizeof
(
SStreamDesc
)
+
sizeof
(
SHeartBeatMsg
)
+
100
;
int
size
=
numOfQueries
*
sizeof
(
SQueryDesc
)
+
numOfStreams
*
sizeof
(
SStreamDesc
)
+
sizeof
(
SHeartBeatMsg
)
+
100
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
pthread_mutex_unlock
(
&
pObj
->
mutex
);
pthread_mutex_unlock
(
&
pObj
->
mutex
);
tscError
(
"
%p failed to create heartbeat msg"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to create heartbeat msg"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -1969,7 +1969,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
...
@@ -1969,7 +1969,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
STableMeta
*
pTableMeta
=
tscCreateTableMetaFromMsg
(
pMetaMsg
);
STableMeta
*
pTableMeta
=
tscCreateTableMetaFromMsg
(
pMetaMsg
);
if
(
!
tIsValidSchema
(
pTableMeta
->
schema
,
pTableMeta
->
tableInfo
.
numOfColumns
,
pTableMeta
->
tableInfo
.
numOfTags
))
{
if
(
!
tIsValidSchema
(
pTableMeta
->
schema
,
pTableMeta
->
tableInfo
.
numOfColumns
,
pTableMeta
->
tableInfo
.
numOfTags
))
{
tscError
(
"
%p invalid table meta from mnode, name:%s"
,
pSql
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
));
tscError
(
"
0x%"
PRIx64
" invalid table meta from mnode, name:%s"
,
pSql
->
self
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
));
return
TSDB_CODE_TSC_INVALID_VALUE
;
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
}
...
@@ -2163,7 +2163,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
...
@@ -2163,7 +2163,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
pInfo
->
vgroupList
->
numOfVgroups
=
pVgroupMsg
->
numOfVgroups
;
pInfo
->
vgroupList
->
numOfVgroups
=
pVgroupMsg
->
numOfVgroups
;
if
(
pInfo
->
vgroupList
->
numOfVgroups
<=
0
)
{
if
(
pInfo
->
vgroupList
->
numOfVgroups
<=
0
)
{
//tfree(pInfo->vgroupList);
//tfree(pInfo->vgroupList);
tscError
(
"
%p empty vgroup info"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" empty vgroup info"
,
pSql
->
self
);
}
else
{
}
else
{
for
(
int32_t
j
=
0
;
j
<
pInfo
->
vgroupList
->
numOfVgroups
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pInfo
->
vgroupList
->
numOfVgroups
;
++
j
)
{
// just init, no need to lock
// just init, no need to lock
...
@@ -2475,7 +2475,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
...
@@ -2475,7 +2475,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static
int32_t
getTableMetaFromMnode
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
)
{
static
int32_t
getTableMetaFromMnode
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
)
{
SSqlObj
*
pNew
=
calloc
(
1
,
sizeof
(
SSqlObj
));
SSqlObj
*
pNew
=
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
NULL
==
pNew
)
{
if
(
NULL
==
pNew
)
{
tscError
(
"
%p malloc failed for new sqlobj to get table meta"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" malloc failed for new sqlobj to get table meta"
,
pSql
->
self
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -2489,7 +2489,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
...
@@ -2489,7 +2489,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
pNew
->
cmd
.
autoCreated
=
pSql
->
cmd
.
autoCreated
;
// create table if not exists
pNew
->
cmd
.
autoCreated
=
pSql
->
cmd
.
autoCreated
;
// create table if not exists
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
&
pNew
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
+
pSql
->
cmd
.
payloadLen
))
{
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
&
pNew
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
+
pSql
->
cmd
.
payloadLen
))
{
tscError
(
"
%p malloc failed for payload to get table meta"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" malloc failed for payload to get table meta"
,
pSql
->
self
);
tscFreeSqlObj
(
pNew
);
tscFreeSqlObj
(
pNew
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -2502,7 +2502,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
...
@@ -2502,7 +2502,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
if
(
pSql
->
cmd
.
autoCreated
)
{
if
(
pSql
->
cmd
.
autoCreated
)
{
int32_t
code
=
copyTagData
(
&
pNew
->
cmd
.
tagData
,
&
pSql
->
cmd
.
tagData
);
int32_t
code
=
copyTagData
(
&
pNew
->
cmd
.
tagData
,
&
pSql
->
cmd
.
tagData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p malloc failed for new tag data to get table meta"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" malloc failed for new tag data to get table meta"
,
pSql
->
self
);
tscFreeSqlObj
(
pNew
);
tscFreeSqlObj
(
pNew
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -2580,7 +2580,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
...
@@ -2580,7 +2580,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
int32_t
code
=
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
name
);
int32_t
code
=
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
name
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p failed to generate the table full name"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to generate the table full name"
,
pSql
->
self
);
return
TSDB_CODE_TSC_INVALID_SQL
;
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
...
...
src/client/src/tscSql.c
浏览文件 @
dceb51d4
...
@@ -588,7 +588,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
...
@@ -588,7 +588,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
void
taos_free_result
(
TAOS_RES
*
res
)
{
void
taos_free_result
(
TAOS_RES
*
res
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
tscError
(
"
%p already released sqlObj"
,
res
);
tscError
(
"
0x%"
PRIx64
" already released sqlObj"
,
pSql
?
pSql
->
self
:
-
1
);
return
;
return
;
}
}
...
@@ -881,15 +881,14 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
...
@@ -881,15 +881,14 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
int32_t
sqlLen
=
(
int32_t
)
strlen
(
sql
);
int32_t
sqlLen
=
(
int32_t
)
strlen
(
sql
);
if
(
sqlLen
>
tsMaxSQLStringLen
)
{
if
(
sqlLen
>
tsMaxSQLStringLen
)
{
tscError
(
"
%p sql too long"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" sql too long"
,
pSql
->
self
);
tfree
(
pSql
);
tfree
(
pSql
);
return
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
return
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
}
}
pSql
->
sqlstr
=
realloc
(
pSql
->
sqlstr
,
sqlLen
+
1
);
pSql
->
sqlstr
=
realloc
(
pSql
->
sqlstr
,
sqlLen
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"%p failed to malloc sql string buffer"
,
pSql
);
tscError
(
"0x%"
PRIx64
" failed to malloc sql string buffer"
,
pSql
->
self
);
tscDebug
(
"0x%"
PRIx64
" Valid SQL result:%d, %s pObj:%p"
,
pSql
->
self
,
pRes
->
code
,
taos_errstr
(
pSql
),
pObj
);
tfree
(
pSql
);
tfree
(
pSql
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -914,7 +913,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
...
@@ -914,7 +913,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
}
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsc
Debug
(
"0x%"
PRIx64
" V
alid SQL result:%d, %s pObj:%p"
,
pSql
->
self
,
code
,
taos_errstr
(
pSql
),
pObj
);
tsc
Error
(
"0x%"
PRIx64
" inv
alid SQL result:%d, %s pObj:%p"
,
pSql
->
self
,
code
,
taos_errstr
(
pSql
),
pObj
);
}
}
taos_free_result
(
pSql
);
taos_free_result
(
pSql
);
...
@@ -1031,14 +1030,14 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
...
@@ -1031,14 +1030,14 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
int32_t
tblListLen
=
(
int32_t
)
strlen
(
tableNameList
);
int32_t
tblListLen
=
(
int32_t
)
strlen
(
tableNameList
);
if
(
tblListLen
>
MAX_TABLE_NAME_LENGTH
)
{
if
(
tblListLen
>
MAX_TABLE_NAME_LENGTH
)
{
tscError
(
"
%p tableNameList too long, length:%d, maximum allowed:%d"
,
pSql
,
tblListLen
,
MAX_TABLE_NAME_LENGTH
);
tscError
(
"
0x%"
PRIx64
" tableNameList too long, length:%d, maximum allowed:%d"
,
pSql
->
self
,
tblListLen
,
MAX_TABLE_NAME_LENGTH
);
tscFreeSqlObj
(
pSql
);
tscFreeSqlObj
(
pSql
);
return
TSDB_CODE_TSC_INVALID_SQL
;
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
char
*
str
=
calloc
(
1
,
tblListLen
+
1
);
char
*
str
=
calloc
(
1
,
tblListLen
+
1
);
if
(
str
==
NULL
)
{
if
(
str
==
NULL
)
{
tscError
(
"
%p failed to malloc sql string buffer"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc sql string buffer"
,
pSql
->
self
);
tscFreeSqlObj
(
pSql
);
tscFreeSqlObj
(
pSql
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
...
src/client/src/tscStream.c
浏览文件 @
dceb51d4
...
@@ -194,7 +194,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
...
@@ -194,7 +194,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
if
(
tres
==
NULL
||
numOfRows
<
0
)
{
if
(
tres
==
NULL
||
numOfRows
<
0
)
{
int64_t
retryDelay
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
interval
.
sliding
,
pStream
->
precision
);
int64_t
retryDelay
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
interval
.
sliding
,
pStream
->
precision
);
tscError
(
"
%p stream:%p, query data failed, code:0x%08x, retry in %"
PRId64
"ms"
,
pStream
->
pSql
,
pStream
,
numOfRows
,
tscError
(
"
0x%"
PRIx64
" stream:%p, query data failed, code:0x%08x, retry in %"
PRId64
"ms"
,
pStream
->
pSql
->
self
,
pStream
,
numOfRows
,
retryDelay
);
retryDelay
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pStream
->
pSql
->
cmd
,
0
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pStream
->
pSql
->
cmd
,
0
,
0
);
...
@@ -267,7 +267,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
...
@@ -267,7 +267,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if
(
pSql
==
NULL
||
numOfRows
<
0
)
{
if
(
pSql
==
NULL
||
numOfRows
<
0
)
{
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
interval
.
sliding
,
pStream
->
precision
);
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
interval
.
sliding
,
pStream
->
precision
);
tscError
(
"
%p stream:%p, retrieve data failed, code:0x%08x, retry in %"
PRId64
"ms"
,
pSql
,
pStream
,
numOfRows
,
retryDelayTime
);
tscError
(
"
0x%"
PRIx64
" stream:%p, retrieve data failed, code:0x%08x, retry in %"
PRId64
"ms"
,
pSql
->
self
,
pStream
,
numOfRows
,
retryDelayTime
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retryDelayTime
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retryDelayTime
);
return
;
return
;
...
@@ -300,7 +300,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
...
@@ -300,7 +300,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
/* no resuls in the query range, retry */
/* no resuls in the query range, retry */
// todo set retry dynamic time
// todo set retry dynamic time
int32_t
retry
=
tsProjectExecInterval
;
int32_t
retry
=
tsProjectExecInterval
;
tscError
(
"
%p stream:%p, retrieve no data, code:0x%08x, retry in %"
PRId32
"ms"
,
pSql
,
pStream
,
numOfRows
,
retry
);
tscError
(
"
0x%"
PRIx64
" stream:%p, retrieve no data, code:0x%08x, retry in %"
PRId32
"ms"
,
pSql
->
self
,
pStream
,
numOfRows
,
retry
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retry
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retry
);
return
;
return
;
...
@@ -448,7 +448,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
...
@@ -448,7 +448,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
}
if
(
pQueryInfo
->
interval
.
intervalUnit
!=
'n'
&&
pQueryInfo
->
interval
.
intervalUnit
!=
'y'
&&
pQueryInfo
->
interval
.
interval
<
minIntervalTime
)
{
if
(
pQueryInfo
->
interval
.
intervalUnit
!=
'n'
&&
pQueryInfo
->
interval
.
intervalUnit
!=
'y'
&&
pQueryInfo
->
interval
.
interval
<
minIntervalTime
)
{
tscWarn
(
"
%p stream:%p, original sample interval:%"
PRId64
" too small, reset to:%"
PRId64
,
pSql
,
pStream
,
tscWarn
(
"
0x%"
PRIx64
" stream:%p, original sample interval:%"
PRId64
" too small, reset to:%"
PRId64
,
pSql
->
self
,
pStream
,
(
int64_t
)
pQueryInfo
->
interval
.
interval
,
minIntervalTime
);
(
int64_t
)
pQueryInfo
->
interval
.
interval
,
minIntervalTime
);
pQueryInfo
->
interval
.
interval
=
minIntervalTime
;
pQueryInfo
->
interval
.
interval
=
minIntervalTime
;
}
}
...
@@ -465,14 +465,14 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
...
@@ -465,14 +465,14 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
tsMinSlidingTime
*
1000L
:
tsMinSlidingTime
;
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
tsMinSlidingTime
*
1000L
:
tsMinSlidingTime
;
if
(
pQueryInfo
->
interval
.
intervalUnit
!=
'n'
&&
pQueryInfo
->
interval
.
intervalUnit
!=
'y'
&&
pQueryInfo
->
interval
.
sliding
<
minSlidingTime
)
{
if
(
pQueryInfo
->
interval
.
intervalUnit
!=
'n'
&&
pQueryInfo
->
interval
.
intervalUnit
!=
'y'
&&
pQueryInfo
->
interval
.
sliding
<
minSlidingTime
)
{
tscWarn
(
"
%p stream:%p, original sliding value:%"
PRId64
" too small, reset to:%"
PRId64
,
pSql
,
pStream
,
tscWarn
(
"
0x%"
PRIx64
" stream:%p, original sliding value:%"
PRId64
" too small, reset to:%"
PRId64
,
pSql
->
self
,
pStream
,
pQueryInfo
->
interval
.
sliding
,
minSlidingTime
);
pQueryInfo
->
interval
.
sliding
,
minSlidingTime
);
pQueryInfo
->
interval
.
sliding
=
minSlidingTime
;
pQueryInfo
->
interval
.
sliding
=
minSlidingTime
;
}
}
if
(
pQueryInfo
->
interval
.
sliding
>
pQueryInfo
->
interval
.
interval
)
{
if
(
pQueryInfo
->
interval
.
sliding
>
pQueryInfo
->
interval
.
interval
)
{
tscWarn
(
"
%p stream:%p, sliding value:%"
PRId64
" can not be larger than interval range, reset to:%"
PRId64
,
pSql
,
pStream
,
tscWarn
(
"
0x%"
PRIx64
" stream:%p, sliding value:%"
PRId64
" can not be larger than interval range, reset to:%"
PRId64
,
pSql
->
self
,
pStream
,
pQueryInfo
->
interval
.
sliding
,
pQueryInfo
->
interval
.
interval
);
pQueryInfo
->
interval
.
sliding
,
pQueryInfo
->
interval
.
interval
);
pQueryInfo
->
interval
.
sliding
=
pQueryInfo
->
interval
.
interval
;
pQueryInfo
->
interval
.
sliding
=
pQueryInfo
->
interval
.
interval
;
...
@@ -515,7 +515,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
...
@@ -515,7 +515,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
}
else
{
}
else
{
int64_t
newStime
=
taosTimeTruncate
(
stime
,
&
pStream
->
interval
,
pStream
->
precision
);
int64_t
newStime
=
taosTimeTruncate
(
stime
,
&
pStream
->
interval
,
pStream
->
precision
);
if
(
newStime
!=
stime
)
{
if
(
newStime
!=
stime
)
{
tscWarn
(
"
%p stream:%p, last timestamp:%"
PRId64
", reset to:%"
PRId64
,
pSql
,
pStream
,
stime
,
newStime
);
tscWarn
(
"
0x%"
PRIx64
" stream:%p, last timestamp:%"
PRId64
", reset to:%"
PRId64
,
pSql
->
self
,
pStream
,
stime
,
newStime
);
stime
=
newStime
;
stime
=
newStime
;
}
}
}
}
...
@@ -546,7 +546,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
...
@@ -546,7 +546,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pSql
->
res
.
code
=
code
;
pSql
->
res
.
code
=
code
;
tscError
(
"
%p open stream failed, sql:%s, reason:%s, code:%s"
,
pSql
,
pSql
->
sqlstr
,
pCmd
->
payload
,
tstrerror
(
code
));
tscError
(
"
0x%"
PRIx64
" open stream failed, sql:%s, reason:%s, code:%s"
,
pSql
->
self
,
pSql
->
sqlstr
,
pCmd
->
payload
,
tstrerror
(
code
));
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
return
;
return
;
...
@@ -565,7 +565,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
...
@@ -565,7 +565,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
if
(
tscSetSlidingWindowInfo
(
pSql
,
pStream
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tscSetSlidingWindowInfo
(
pSql
,
pStream
)
!=
TSDB_CODE_SUCCESS
)
{
pSql
->
res
.
code
=
code
;
pSql
->
res
.
code
=
code
;
tscError
(
"
%p stream %p open failed, since the interval value is incorrect"
,
pSql
,
pStream
);
tscError
(
"
0x%"
PRIx64
" stream %p open failed, since the interval value is incorrect"
,
pSql
->
self
,
pStream
);
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
return
;
return
;
}
}
...
@@ -605,7 +605,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
...
@@ -605,7 +605,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SSqlStream
*
pStream
=
(
SSqlStream
*
)
calloc
(
1
,
sizeof
(
SSqlStream
));
SSqlStream
*
pStream
=
(
SSqlStream
*
)
calloc
(
1
,
sizeof
(
SSqlStream
));
if
(
pStream
==
NULL
)
{
if
(
pStream
==
NULL
)
{
tscError
(
"
%p open stream failed, sql:%s, reason:%s, code:0x%08x"
,
pSql
,
sqlstr
,
pCmd
->
payload
,
pRes
->
code
);
tscError
(
"
0x%"
PRIx64
" open stream failed, sql:%s, reason:%s, code:0x%08x"
,
pSql
->
self
,
sqlstr
,
pCmd
->
payload
,
pRes
->
code
);
tscFreeSqlObj
(
pSql
);
tscFreeSqlObj
(
pSql
);
return
NULL
;
return
NULL
;
}
}
...
@@ -621,7 +621,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
...
@@ -621,7 +621,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pSql
->
sqlstr
=
calloc
(
1
,
strlen
(
sqlstr
)
+
1
);
pSql
->
sqlstr
=
calloc
(
1
,
strlen
(
sqlstr
)
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"
%p failed to malloc sql string buffer"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc sql string buffer"
,
pSql
->
self
);
tscFreeSqlObj
(
pSql
);
tscFreeSqlObj
(
pSql
);
return
NULL
;
return
NULL
;
}
}
...
@@ -640,7 +640,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
...
@@ -640,7 +640,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
if
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
code
==
TSDB_CODE_SUCCESS
)
{
tscCreateStream
(
pStream
,
pSql
,
code
);
tscCreateStream
(
pStream
,
pSql
,
code
);
}
else
if
(
code
!=
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
}
else
if
(
code
!=
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
tscError
(
"
%p open stream failed, sql:%s, code:%s"
,
pSql
,
sqlstr
,
tstrerror
(
code
));
tscError
(
"
0x%"
PRIx64
" open stream failed, sql:%s, code:%s"
,
pSql
->
self
,
sqlstr
,
tstrerror
(
code
));
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
free
(
pStream
);
free
(
pStream
);
return
NULL
;
return
NULL
;
...
...
src/client/src/tscSub.c
浏览文件 @
dceb51d4
...
@@ -224,11 +224,11 @@ static SArray* getTableList( SSqlObj* pSql ) {
...
@@ -224,11 +224,11 @@ static SArray* getTableList( SSqlObj* pSql ) {
SSqlObj
*
pNew
=
taos_query
(
pSql
->
pTscObj
,
sql
);
SSqlObj
*
pNew
=
taos_query
(
pSql
->
pTscObj
,
sql
);
if
(
pNew
==
NULL
)
{
if
(
pNew
==
NULL
)
{
tscError
(
"
failed to retrieve table id: cannot create new sql object."
);
tscError
(
"
0x%"
PRIx64
"failed to retrieve table id: cannot create new sql object."
,
pSql
->
self
);
return
NULL
;
return
NULL
;
}
else
if
(
taos_errno
(
pNew
)
!=
TSDB_CODE_SUCCESS
)
{
}
else
if
(
taos_errno
(
pNew
)
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
failed to retrieve table id: %s"
,
tstrerror
(
taos_errno
(
pNew
)));
tscError
(
"
0x%"
PRIx64
"failed to retrieve table id,error: %s"
,
pSql
->
self
,
tstrerror
(
taos_errno
(
pNew
)));
return
NULL
;
return
NULL
;
}
}
...
...
src/client/src/tscSubquery.c
浏览文件 @
dceb51d4
...
@@ -673,7 +673,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
...
@@ -673,7 +673,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
//prepare the subqueries object failed, abort
//prepare the subqueries object failed, abort
if
(
!
success
)
{
if
(
!
success
)
{
pSql
->
res
.
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
pSql
->
res
.
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscError
(
"
%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d"
,
pSql
,
tscError
(
"
0x%"
PRIx64
" failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d"
,
pSql
->
self
,
pSql
->
subState
.
numOfSub
,
pSql
->
res
.
code
);
pSql
->
subState
.
numOfSub
,
pSql
->
res
.
code
);
freeJoinSubqueryObj
(
pSql
);
freeJoinSubqueryObj
(
pSql
);
...
@@ -717,7 +717,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
...
@@ -717,7 +717,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
static
int32_t
quitAllSubquery
(
SSqlObj
*
pSqlSub
,
SSqlObj
*
pSqlObj
,
SJoinSupporter
*
pSupporter
)
{
static
int32_t
quitAllSubquery
(
SSqlObj
*
pSqlSub
,
SSqlObj
*
pSqlObj
,
SJoinSupporter
*
pSupporter
)
{
if
(
subAndCheckDone
(
pSqlSub
,
pSqlObj
,
pSupporter
->
subqueryIndex
))
{
if
(
subAndCheckDone
(
pSqlSub
,
pSqlObj
,
pSupporter
->
subqueryIndex
))
{
tscError
(
"
%p all subquery return and query failed, global code:%s"
,
pSqlObj
,
tstrerror
(
pSqlObj
->
res
.
code
));
tscError
(
"
0x%"
PRIx64
" all subquery return and query failed, global code:%s"
,
pSqlObj
->
self
,
tstrerror
(
pSqlObj
->
res
.
code
));
freeJoinSubqueryObj
(
pSqlObj
);
freeJoinSubqueryObj
(
pSqlObj
);
return
0
;
return
0
;
}
}
...
@@ -801,7 +801,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
...
@@ -801,7 +801,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
STableIdInfo
item
=
{.
uid
=
tt
->
uid
,
.
tid
=
tt
->
tid
,
.
key
=
INT64_MIN
};
STableIdInfo
item
=
{.
uid
=
tt
->
uid
,
.
tid
=
tt
->
tid
,
.
key
=
INT64_MIN
};
taosArrayPush
(
vgTables
,
&
item
);
taosArrayPush
(
vgTables
,
&
item
);
tscTrace
(
"
%p tid:%d, uid:%"
PRIu64
",vgId:%d added"
,
pSql
,
tt
->
tid
,
tt
->
uid
,
tt
->
vgId
);
tscTrace
(
"
0x%"
PRIx64
" tid:%d, uid:%"
PRIu64
",vgId:%d added"
,
pSql
->
self
,
tt
->
tid
,
tt
->
uid
,
tt
->
vgId
);
prev
=
tt
;
prev
=
tt
;
}
}
...
@@ -880,7 +880,7 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
...
@@ -880,7 +880,7 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
assert
(
prev
->
vgId
>=
1
&&
p
->
vgId
>=
1
);
assert
(
prev
->
vgId
>=
1
&&
p
->
vgId
>=
1
);
if
(
doCompare
(
prev
->
tag
,
p
->
tag
,
pColSchema
->
type
,
pColSchema
->
bytes
)
==
0
)
{
if
(
doCompare
(
prev
->
tag
,
p
->
tag
,
pColSchema
->
type
,
pColSchema
->
bytes
)
==
0
)
{
tscError
(
"
%p join tags have same value for different table, free all sub SqlObj and quit"
,
pPSqlObj
);
tscError
(
"
0x%"
PRIx64
" join tags have same value for different table, free all sub SqlObj and quit"
,
pPSqlObj
->
self
);
pPSqlObj
->
res
.
code
=
TSDB_CODE_QRY_DUP_JOIN_KEY
;
pPSqlObj
->
res
.
code
=
TSDB_CODE_QRY_DUP_JOIN_KEY
;
return
false
;
return
false
;
}
}
...
@@ -1116,7 +1116,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
...
@@ -1116,7 +1116,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
assert
(
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_TAG_FILTER_QUERY
));
assert
(
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_TAG_FILTER_QUERY
));
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p abort query due to other subquery failure. code:%d, global code:%d"
,
pSql
,
numOfRows
,
pParentSql
->
res
.
code
);
tscError
(
"
0x%"
PRIx64
" abort query due to other subquery failure. code:%d, global code:%d"
,
pSql
->
self
,
numOfRows
,
pParentSql
->
res
.
code
);
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
return
;
return
;
}
}
...
@@ -1131,7 +1131,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
...
@@ -1131,7 +1131,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// todo retry if other subqueries are not failed
// todo retry if other subqueries are not failed
assert
(
numOfRows
<
0
&&
numOfRows
==
taos_errno
(
pSql
));
assert
(
numOfRows
<
0
&&
numOfRows
==
taos_errno
(
pSql
));
tscError
(
"
%p sub query failed, code:%s, index:%d"
,
pSql
,
tstrerror
(
numOfRows
),
pSupporter
->
subqueryIndex
);
tscError
(
"
0x%"
PRIx64
" sub query failed, code:%s, index:%d"
,
pSql
->
self
,
tstrerror
(
numOfRows
),
pSupporter
->
subqueryIndex
);
pParentSql
->
res
.
code
=
numOfRows
;
pParentSql
->
res
.
code
=
numOfRows
;
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
...
@@ -1150,7 +1150,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
...
@@ -1150,7 +1150,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// todo handle memory error
// todo handle memory error
char
*
tmp
=
realloc
(
pSupporter
->
pIdTagList
,
length
);
char
*
tmp
=
realloc
(
pSupporter
->
pIdTagList
,
length
);
if
(
tmp
==
NULL
)
{
if
(
tmp
==
NULL
)
{
tscError
(
"
%p failed to malloc memory"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to malloc memory"
,
pSql
->
self
);
pParentSql
->
res
.
code
=
TAOS_SYSTEM_ERROR
(
errno
);
pParentSql
->
res
.
code
=
TAOS_SYSTEM_ERROR
(
errno
);
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
...
@@ -1270,7 +1270,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
...
@@ -1270,7 +1270,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
assert
(
!
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
));
assert
(
!
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
));
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p abort query due to other subquery failure. code:%d, global code:%d"
,
pSql
,
numOfRows
,
pParentSql
->
res
.
code
);
tscError
(
"
0x%"
PRIx64
" abort query due to other subquery failure. code:%d, global code:%d"
,
pSql
->
self
,
numOfRows
,
pParentSql
->
res
.
code
);
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
)){
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
)){
return
;
return
;
}
}
...
@@ -1284,7 +1284,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
...
@@ -1284,7 +1284,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if
(
taos_errno
(
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
taos_errno
(
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
// todo retry if other subqueries are not failed yet
// todo retry if other subqueries are not failed yet
assert
(
numOfRows
<
0
&&
numOfRows
==
taos_errno
(
pSql
));
assert
(
numOfRows
<
0
&&
numOfRows
==
taos_errno
(
pSql
));
tscError
(
"
%p sub query failed, code:%s, index:%d"
,
pSql
,
tstrerror
(
numOfRows
),
pSupporter
->
subqueryIndex
);
tscError
(
"
0x%"
PRIx64
" sub query failed, code:%s, index:%d"
,
pSql
->
self
,
tstrerror
(
numOfRows
),
pSupporter
->
subqueryIndex
);
pParentSql
->
res
.
code
=
numOfRows
;
pParentSql
->
res
.
code
=
numOfRows
;
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
)){
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
)){
...
@@ -1300,7 +1300,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
...
@@ -1300,7 +1300,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pSupporter
->
f
=
fopen
(
pSupporter
->
path
,
"wb"
);
pSupporter
->
f
=
fopen
(
pSupporter
->
path
,
"wb"
);
if
(
pSupporter
->
f
==
NULL
)
{
if
(
pSupporter
->
f
==
NULL
)
{
tscError
(
"
%p failed to create tmp file:%s, reason:%s"
,
pSql
,
pSupporter
->
path
,
strerror
(
errno
));
tscError
(
"
0x%"
PRIx64
" failed to create tmp file:%s, reason:%s"
,
pSql
->
self
,
pSupporter
->
path
,
strerror
(
errno
));
pParentSql
->
res
.
code
=
TAOS_SYSTEM_ERROR
(
errno
);
pParentSql
->
res
.
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
@@ -1320,7 +1320,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
...
@@ -1320,7 +1320,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
STSBuf
*
pBuf
=
tsBufCreateFromFile
(
pSupporter
->
path
,
true
);
STSBuf
*
pBuf
=
tsBufCreateFromFile
(
pSupporter
->
path
,
true
);
if
(
pBuf
==
NULL
)
{
// in error process, close the fd
if
(
pBuf
==
NULL
)
{
// in error process, close the fd
tscError
(
"
%p invalid ts comp file from vnode, abort subquery, file size:%d"
,
pSql
,
numOfRows
);
tscError
(
"
0x%"
PRIx64
" invalid ts comp file from vnode, abort subquery, file size:%d"
,
pSql
->
self
,
numOfRows
);
pParentSql
->
res
.
code
=
TAOS_SYSTEM_ERROR
(
errno
);
pParentSql
->
res
.
code
=
TAOS_SYSTEM_ERROR
(
errno
);
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
)){
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
)){
...
@@ -1417,7 +1417,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
...
@@ -1417,7 +1417,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p abort query due to other subquery failure. code:%d, global code:%d"
,
pSql
,
numOfRows
,
pParentSql
->
res
.
code
);
tscError
(
"
0x%"
PRIx64
" abort query due to other subquery failure. code:%d, global code:%d"
,
pSql
->
self
,
numOfRows
,
pParentSql
->
res
.
code
);
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
return
;
return
;
}
}
...
@@ -1432,7 +1432,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
...
@@ -1432,7 +1432,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
assert
(
numOfRows
==
taos_errno
(
pSql
));
assert
(
numOfRows
==
taos_errno
(
pSql
));
pParentSql
->
res
.
code
=
numOfRows
;
pParentSql
->
res
.
code
=
numOfRows
;
tscError
(
"
%p retrieve failed, index:%d, code:%s"
,
pSql
,
pSupporter
->
subqueryIndex
,
tstrerror
(
numOfRows
));
tscError
(
"
0x%"
PRIx64
" retrieve failed, index:%d, code:%s"
,
pSql
->
self
,
pSupporter
->
subqueryIndex
,
tstrerror
(
numOfRows
));
tscAsyncResultOnError
(
pParentSql
);
tscAsyncResultOnError
(
pParentSql
);
return
;
return
;
...
@@ -1762,7 +1762,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
...
@@ -1762,7 +1762,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// retrieve actual query results from vnode during the second stage join subquery
// retrieve actual query results from vnode during the second stage join subquery
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p abort query due to other subquery failure. code:%d, global code:%d"
,
pSql
,
code
,
pParentSql
->
res
.
code
);
tscError
(
"
0x%"
PRIx64
" abort query due to other subquery failure. code:%d, global code:%d"
,
pSql
->
self
,
code
,
pParentSql
->
res
.
code
);
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
return
;
return
;
}
}
...
@@ -1776,7 +1776,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
...
@@ -1776,7 +1776,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if
(
taos_errno
(
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
taos_errno
(
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
assert
(
taos_errno
(
pSql
)
==
code
);
assert
(
taos_errno
(
pSql
)
==
code
);
tscError
(
"
%p abort query, code:%s, global code:%s"
,
pSql
,
tstrerror
(
code
),
tstrerror
(
pParentSql
->
res
.
code
));
tscError
(
"
0x%"
PRIx64
" abort query, code:%s, global code:%s"
,
pSql
->
self
,
tstrerror
(
code
),
tstrerror
(
pParentSql
->
res
.
code
));
pParentSql
->
res
.
code
=
code
;
pParentSql
->
res
.
code
=
code
;
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
if
(
quitAllSubquery
(
pSql
,
pParentSql
,
pSupporter
))
{
...
@@ -2011,7 +2011,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
...
@@ -2011,7 +2011,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
SJoinSupporter
*
pSupporter
=
tscCreateJoinSupporter
(
pSql
,
i
);
SJoinSupporter
*
pSupporter
=
tscCreateJoinSupporter
(
pSql
,
i
);
if
(
pSupporter
==
NULL
)
{
// failed to create support struct, abort current query
if
(
pSupporter
==
NULL
)
{
// failed to create support struct, abort current query
tscError
(
"
%p tableIndex:%d, failed to allocate join support object, abort further query"
,
pSql
,
i
);
tscError
(
"
0x%"
PRIx64
" tableIndex:%d, failed to allocate join support object, abort further query"
,
pSql
->
self
,
i
);
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
goto
_error
;
}
}
...
@@ -2498,7 +2498,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
...
@@ -2498,7 +2498,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
for
(;
i
<
pState
->
numOfSub
;
++
i
)
{
for
(;
i
<
pState
->
numOfSub
;
++
i
)
{
SRetrieveSupport
*
trs
=
(
SRetrieveSupport
*
)
calloc
(
1
,
sizeof
(
SRetrieveSupport
));
SRetrieveSupport
*
trs
=
(
SRetrieveSupport
*
)
calloc
(
1
,
sizeof
(
SRetrieveSupport
));
if
(
trs
==
NULL
)
{
if
(
trs
==
NULL
)
{
tscError
(
"
%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s"
,
pSql
,
i
,
strerror
(
errno
));
tscError
(
"
0x%"
PRIx64
" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s"
,
pSql
->
self
,
i
,
strerror
(
errno
));
break
;
break
;
}
}
...
@@ -2507,7 +2507,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
...
@@ -2507,7 +2507,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs
->
localBuffer
=
(
tFilePage
*
)
calloc
(
1
,
nBufferSize
+
sizeof
(
tFilePage
));
trs
->
localBuffer
=
(
tFilePage
*
)
calloc
(
1
,
nBufferSize
+
sizeof
(
tFilePage
));
if
(
trs
->
localBuffer
==
NULL
)
{
if
(
trs
->
localBuffer
==
NULL
)
{
tscError
(
"
%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s"
,
pSql
,
i
,
strerror
(
errno
));
tscError
(
"
0x%"
PRIx64
" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s"
,
pSql
->
self
,
i
,
strerror
(
errno
));
tfree
(
trs
);
tfree
(
trs
);
break
;
break
;
}
}
...
@@ -2519,7 +2519,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
...
@@ -2519,7 +2519,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SSqlObj
*
pNew
=
tscCreateSTableSubquery
(
pSql
,
trs
,
NULL
);
SSqlObj
*
pNew
=
tscCreateSTableSubquery
(
pSql
,
trs
,
NULL
);
if
(
pNew
==
NULL
)
{
if
(
pNew
==
NULL
)
{
tscError
(
"
%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s"
,
pSql
,
i
,
strerror
(
errno
));
tscError
(
"
0x%"
PRIx64
" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s"
,
pSql
->
self
,
i
,
strerror
(
errno
));
tfree
(
trs
->
localBuffer
);
tfree
(
trs
->
localBuffer
);
tfree
(
trs
);
tfree
(
trs
);
break
;
break
;
...
@@ -2536,7 +2536,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
...
@@ -2536,7 +2536,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
}
}
if
(
i
<
pState
->
numOfSub
)
{
if
(
i
<
pState
->
numOfSub
)
{
tscError
(
"
%p failed to prepare subquery structure and launch subqueries"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to prepare subquery structure and launch subqueries"
,
pSql
->
self
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pFinalModel
,
pState
->
numOfSub
);
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pFinalModel
,
pState
->
numOfSub
);
...
@@ -2580,7 +2580,7 @@ static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, i
...
@@ -2580,7 +2580,7 @@ static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, i
static
void
tscAbortFurtherRetryRetrieval
(
SRetrieveSupport
*
trsupport
,
TAOS_RES
*
tres
,
int32_t
code
)
{
static
void
tscAbortFurtherRetryRetrieval
(
SRetrieveSupport
*
trsupport
,
TAOS_RES
*
tres
,
int32_t
code
)
{
// set no disk space error info
// set no disk space error info
tscError
(
"sub:
%p failed to flush data to disk, reason:%s"
,
tres
,
tstrerror
(
code
));
tscError
(
"sub:
0x%"
PRIx64
" failed to flush data to disk, reason:%s"
,
((
SSqlObj
*
)
tres
)
->
self
,
tstrerror
(
code
));
SSqlObj
*
pParentSql
=
trsupport
->
pParentSql
;
SSqlObj
*
pParentSql
=
trsupport
->
pParentSql
;
pParentSql
->
res
.
code
=
code
;
pParentSql
->
res
.
code
=
code
;
...
@@ -2605,7 +2605,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
...
@@ -2605,7 +2605,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
const
uint32_t
nBufferSize
=
(
1u
<<
16u
);
// 64KB
const
uint32_t
nBufferSize
=
(
1u
<<
16u
);
// 64KB
trsupport
->
localBuffer
=
(
tFilePage
*
)
calloc
(
1
,
nBufferSize
+
sizeof
(
tFilePage
));
trsupport
->
localBuffer
=
(
tFilePage
*
)
calloc
(
1
,
nBufferSize
+
sizeof
(
tFilePage
));
if
(
trsupport
->
localBuffer
==
NULL
)
{
if
(
trsupport
->
localBuffer
==
NULL
)
{
tscError
(
"
%p failed to malloc buffer for local buffer, reason:%s"
,
pSql
,
strerror
(
errno
));
tscError
(
"
0x%"
PRIx64
" failed to malloc buffer for local buffer, reason:%s"
,
pSql
->
self
,
strerror
(
errno
));
tfree
(
trsupport
);
tfree
(
trsupport
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
...
@@ -2620,13 +2620,13 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
...
@@ -2620,13 +2620,13 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
// clear local saved number of results
// clear local saved number of results
trsupport
->
localBuffer
->
num
=
0
;
trsupport
->
localBuffer
->
num
=
0
;
tscError
(
"
%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d"
,
trsupport
->
pParentSql
,
pSql
,
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
" retrieve/query failed, code:%s, orderOfSub:%d, retry:%d"
,
trsupport
->
pParentSql
->
self
,
pSql
->
self
,
tstrerror
(
code
),
subqueryIndex
,
trsupport
->
numOfRetry
);
tstrerror
(
code
),
subqueryIndex
,
trsupport
->
numOfRetry
);
SSqlObj
*
pNew
=
tscCreateSTableSubquery
(
trsupport
->
pParentSql
,
trsupport
,
pSql
);
SSqlObj
*
pNew
=
tscCreateSTableSubquery
(
trsupport
->
pParentSql
,
trsupport
,
pSql
);
if
(
pNew
==
NULL
)
{
if
(
pNew
==
NULL
)
{
tscError
(
"
%p sub:%p
failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d"
,
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
"
failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d"
,
oriTrs
->
pParentSql
,
pSql
,
tstrerror
(
terrno
),
pVgroup
->
vgId
,
oriTrs
->
subqueryIndex
);
oriTrs
->
pParentSql
->
self
,
pSql
->
self
,
tstrerror
(
terrno
),
pVgroup
->
vgId
,
oriTrs
->
subqueryIndex
);
pParentSql
->
res
.
code
=
terrno
;
pParentSql
->
res
.
code
=
terrno
;
oriTrs
->
numOfRetry
=
MAX_NUM_OF_SUBQUERY_RETRY
;
oriTrs
->
numOfRetry
=
MAX_NUM_OF_SUBQUERY_RETRY
;
...
@@ -2680,7 +2680,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
...
@@ -2680,7 +2680,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
if
(
numOfRows
>=
0
)
{
// current query is successful, but other sub query failed, still abort current query.
if
(
numOfRows
>=
0
)
{
// current query is successful, but other sub query failed, still abort current query.
tscDebug
(
"0x%"
PRIx64
" sub:0x%"
PRIx64
" retrieve numOfRows:%d,orderOfSub:%d"
,
pParentSql
->
self
,
pSql
->
self
,
numOfRows
,
subqueryIndex
);
tscDebug
(
"0x%"
PRIx64
" sub:0x%"
PRIx64
" retrieve numOfRows:%d,orderOfSub:%d"
,
pParentSql
->
self
,
pSql
->
self
,
numOfRows
,
subqueryIndex
);
tscError
(
"
%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%s"
,
pParentSql
,
pSql
,
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
" abort further retrieval due to other queries failure,orderOfSub:%d,code:%s"
,
pParentSql
->
self
,
pSql
->
self
,
subqueryIndex
,
tstrerror
(
pParentSql
->
res
.
code
));
subqueryIndex
,
tstrerror
(
pParentSql
->
res
.
code
));
}
else
{
}
else
{
if
(
trsupport
->
numOfRetry
++
<
MAX_NUM_OF_SUBQUERY_RETRY
&&
pParentSql
->
res
.
code
==
TSDB_CODE_SUCCESS
)
{
if
(
trsupport
->
numOfRetry
++
<
MAX_NUM_OF_SUBQUERY_RETRY
&&
pParentSql
->
res
.
code
==
TSDB_CODE_SUCCESS
)
{
...
@@ -2692,7 +2692,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
...
@@ -2692,7 +2692,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
}
}
}
else
{
// reach the maximum retry count, abort
}
else
{
// reach the maximum retry count, abort
atomic_val_compare_exchange_32
(
&
pParentSql
->
res
.
code
,
TSDB_CODE_SUCCESS
,
numOfRows
);
atomic_val_compare_exchange_32
(
&
pParentSql
->
res
.
code
,
TSDB_CODE_SUCCESS
,
numOfRows
);
tscError
(
"
%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s"
,
pParentSql
,
pSql
,
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
" retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s"
,
pParentSql
->
self
,
pSql
->
self
,
tstrerror
(
numOfRows
),
subqueryIndex
,
tstrerror
(
pParentSql
->
res
.
code
));
tstrerror
(
numOfRows
),
subqueryIndex
,
tstrerror
(
pParentSql
->
res
.
code
));
}
}
}
}
...
@@ -2705,7 +2705,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
...
@@ -2705,7 +2705,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
}
}
// all subqueries are failed
// all subqueries are failed
tscError
(
"
%p retrieve from %d vnode(s) completed,code:%s.FAILED."
,
pParentSql
,
pState
->
numOfSub
,
tscError
(
"
0x%"
PRIx64
" retrieve from %d vnode(s) completed,code:%s.FAILED."
,
pParentSql
->
self
,
pState
->
numOfSub
,
tstrerror
(
pParentSql
->
res
.
code
));
tstrerror
(
pParentSql
->
res
.
code
));
// release allocated resource
// release allocated resource
...
@@ -2753,7 +2753,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
...
@@ -2753,7 +2753,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
#endif
#endif
if
(
tsTotalTmpDirGB
!=
0
&&
tsAvailTmpDirectorySpace
<
tsReservedTmpDirectorySpace
)
{
if
(
tsTotalTmpDirGB
!=
0
&&
tsAvailTmpDirectorySpace
<
tsReservedTmpDirectorySpace
)
{
tscError
(
"
%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query"
,
pParentSql
,
pSql
,
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
" client disk space remain %.3f GB, need at least %.3f GB, stop query"
,
pParentSql
->
self
,
pSql
->
self
,
tsAvailTmpDirectorySpace
,
tsReservedTmpDirectorySpace
);
tsAvailTmpDirectorySpace
,
tsReservedTmpDirectorySpace
);
tscAbortFurtherRetryRetrieval
(
trsupport
,
pSql
,
TSDB_CODE_TSC_NO_DISKSPACE
);
tscAbortFurtherRetryRetrieval
(
trsupport
,
pSql
,
TSDB_CODE_TSC_NO_DISKSPACE
);
return
;
return
;
...
@@ -2838,7 +2838,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
...
@@ -2838,7 +2838,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
}
}
if
(
trsupport
->
numOfRetry
++
<
MAX_NUM_OF_SUBQUERY_RETRY
)
{
if
(
trsupport
->
numOfRetry
++
<
MAX_NUM_OF_SUBQUERY_RETRY
)
{
tscError
(
"
%p sub:%p failed code:%s, retry:%d"
,
pParentSql
,
pSql
,
tstrerror
(
numOfRows
),
trsupport
->
numOfRetry
);
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
" failed code:%s, retry:%d"
,
pParentSql
->
self
,
pSql
->
self
,
tstrerror
(
numOfRows
),
trsupport
->
numOfRetry
);
int32_t
sent
=
0
;
int32_t
sent
=
0
;
...
@@ -2866,8 +2866,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
...
@@ -2866,8 +2866,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
pParentSql
->
self
,
pSql
,
pRes
->
numOfRows
,
pState
->
numOfRetrievedRows
,
pSql
->
epSet
.
fqdn
[
pSql
->
epSet
.
inUse
],
idx
);
pParentSql
->
self
,
pSql
,
pRes
->
numOfRows
,
pState
->
numOfRetrievedRows
,
pSql
->
epSet
.
fqdn
[
pSql
->
epSet
.
inUse
],
idx
);
if
(
num
>
tsMaxNumOfOrderedResults
&&
tscIsProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
if
(
num
>
tsMaxNumOfOrderedResults
&&
tscIsProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
tscError
(
"
%p sub:%p
num of OrderedRes is too many, max allowed:%"
PRId32
" , current:%"
PRId64
,
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
"
num of OrderedRes is too many, max allowed:%"
PRId32
" , current:%"
PRId64
,
pParentSql
,
pSql
,
tsMaxNumOfOrderedResults
,
num
);
pParentSql
->
self
,
pSql
->
self
,
tsMaxNumOfOrderedResults
,
num
);
tscAbortFurtherRetryRetrieval
(
trsupport
,
tres
,
TSDB_CODE_TSC_SORTED_RES_TOO_MANY
);
tscAbortFurtherRetryRetrieval
(
trsupport
,
tres
,
TSDB_CODE_TSC_SORTED_RES_TOO_MANY
);
return
;
return
;
}
}
...
@@ -2882,7 +2882,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
...
@@ -2882,7 +2882,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
// no disk space for tmp directory
// no disk space for tmp directory
if
(
tsTotalTmpDirGB
!=
0
&&
tsAvailTmpDirectorySpace
<
tsReservedTmpDirectorySpace
)
{
if
(
tsTotalTmpDirGB
!=
0
&&
tsAvailTmpDirectorySpace
<
tsReservedTmpDirectorySpace
)
{
tscError
(
"
%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query"
,
pParentSql
,
pSql
,
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
" client disk space remain %.3f GB, need at least %.3f GB, stop query"
,
pParentSql
->
self
,
pSql
->
self
,
tsAvailTmpDirectorySpace
,
tsReservedTmpDirectorySpace
);
tsAvailTmpDirectorySpace
,
tsReservedTmpDirectorySpace
);
tscAbortFurtherRetryRetrieval
(
trsupport
,
tres
,
TSDB_CODE_TSC_NO_DISKSPACE
);
tscAbortFurtherRetryRetrieval
(
trsupport
,
tres
,
TSDB_CODE_TSC_NO_DISKSPACE
);
return
;
return
;
...
@@ -2951,8 +2951,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
...
@@ -2951,8 +2951,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
// stable query killed or other subquery failed, all query stopped
// stable query killed or other subquery failed, all query stopped
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pParentSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
)
{
trsupport
->
numOfRetry
=
MAX_NUM_OF_SUBQUERY_RETRY
;
trsupport
->
numOfRetry
=
MAX_NUM_OF_SUBQUERY_RETRY
;
tscError
(
"
%p query cancelled or failed, sub:%p
, vgId:%d, orderOfSub:%d, code:%s, global code:%s"
,
tscError
(
"
0x%"
PRIx64
" query cancelled or failed, sub:0x%"
PRIx64
"
, vgId:%d, orderOfSub:%d, code:%s, global code:%s"
,
pParentSql
,
pSql
,
pVgroup
->
vgId
,
trsupport
->
subqueryIndex
,
tstrerror
(
code
),
tstrerror
(
pParentSql
->
res
.
code
));
pParentSql
->
self
,
pSql
->
self
,
pVgroup
->
vgId
,
trsupport
->
subqueryIndex
,
tstrerror
(
code
),
tstrerror
(
pParentSql
->
res
.
code
));
tscHandleSubqueryError
(
param
,
tres
,
code
);
tscHandleSubqueryError
(
param
,
tres
,
code
);
return
;
return
;
...
@@ -2969,7 +2969,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
...
@@ -2969,7 +2969,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
assert
(
code
==
taos_errno
(
pSql
));
assert
(
code
==
taos_errno
(
pSql
));
if
(
trsupport
->
numOfRetry
++
<
MAX_NUM_OF_SUBQUERY_RETRY
)
{
if
(
trsupport
->
numOfRetry
++
<
MAX_NUM_OF_SUBQUERY_RETRY
)
{
tscError
(
"
%p sub:%p failed code:%s, retry:%d"
,
pParentSql
,
pSql
,
tstrerror
(
code
),
trsupport
->
numOfRetry
);
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
" failed code:%s, retry:%d"
,
pParentSql
->
self
,
pSql
->
self
,
tstrerror
(
code
),
trsupport
->
numOfRetry
);
int32_t
sent
=
0
;
int32_t
sent
=
0
;
...
@@ -2978,7 +2978,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
...
@@ -2978,7 +2978,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
return
;
return
;
}
}
}
else
{
}
else
{
tscError
(
"
%p sub:%p reach the max retry times, set global code:%s"
,
pParentSql
,
pSql
,
tstrerror
(
code
));
tscError
(
"
0x%"
PRIx64
" sub:0x%"
PRIx64
" reach the max retry times, set global code:%s"
,
pParentSql
->
self
,
pSql
->
self
,
tstrerror
(
code
));
atomic_val_compare_exchange_32
(
&
pParentSql
->
res
.
code
,
TSDB_CODE_SUCCESS
,
code
);
// set global code and abort
atomic_val_compare_exchange_32
(
&
pParentSql
->
res
.
code
,
TSDB_CODE_SUCCESS
,
code
);
// set global code and abort
}
}
...
@@ -2998,7 +2998,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
...
@@ -2998,7 +2998,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
static
bool
needRetryInsert
(
SSqlObj
*
pParentObj
,
int32_t
numOfSub
)
{
static
bool
needRetryInsert
(
SSqlObj
*
pParentObj
,
int32_t
numOfSub
)
{
if
(
pParentObj
->
retry
>
pParentObj
->
maxRetry
)
{
if
(
pParentObj
->
retry
>
pParentObj
->
maxRetry
)
{
tscError
(
"
%p max retry reached, abort the retry effort"
,
pParentObj
);
tscError
(
"
0x%"
PRIx64
" max retry reached, abort the retry effort"
,
pParentObj
->
self
);
return
false
;
return
false
;
}
}
...
@@ -3090,7 +3090,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
...
@@ -3090,7 +3090,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
}
}
}
}
tscError
(
"
%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d"
,
pParentObj
,
tscError
(
"
0x%"
PRIx64
" Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d"
,
pParentObj
->
self
,
pParentObj
->
res
.
numOfRows
,
numOfFailed
,
numOfSub
);
pParentObj
->
res
.
numOfRows
,
numOfFailed
,
numOfSub
);
tscDebug
(
"0x%"
PRIx64
" cleanup %d tableMeta in hashTable"
,
pParentObj
->
self
,
pParentObj
->
cmd
.
numOfTables
);
tscDebug
(
"0x%"
PRIx64
" cleanup %d tableMeta in hashTable"
,
pParentObj
->
self
,
pParentObj
->
cmd
.
numOfTables
);
...
@@ -3207,7 +3207,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
...
@@ -3207,7 +3207,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlObj
*
pNew
=
createSimpleSubObj
(
pSql
,
multiVnodeInsertFinalize
,
pSupporter
,
TSDB_SQL_INSERT
);
SSqlObj
*
pNew
=
createSimpleSubObj
(
pSql
,
multiVnodeInsertFinalize
,
pSupporter
,
TSDB_SQL_INSERT
);
if
(
pNew
==
NULL
)
{
if
(
pNew
==
NULL
)
{
tscError
(
"
%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s"
,
pSql
,
numOfSub
,
strerror
(
errno
));
tscError
(
"
0x%"
PRIx64
" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s"
,
pSql
->
self
,
numOfSub
,
strerror
(
errno
));
goto
_error
;
goto
_error
;
}
}
...
@@ -3231,7 +3231,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
...
@@ -3231,7 +3231,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
}
}
if
(
numOfSub
<
pSql
->
subState
.
numOfSub
)
{
if
(
numOfSub
<
pSql
->
subState
.
numOfSub
)
{
tscError
(
"
%p failed to prepare subObj structure and launch sub-insertion"
,
pSql
);
tscError
(
"
0x%"
PRIx64
" failed to prepare subObj structure and launch sub-insertion"
,
pSql
->
self
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
goto
_error
;
}
}
...
...
src/client/src/tscUtil.c
浏览文件 @
dceb51d4
...
@@ -913,7 +913,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
...
@@ -913,7 +913,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
int32_t
ret
=
tscGetDataBlockFromList
(
pVnodeDataBlockHashList
,
pOneTableBlock
->
vgId
,
TSDB_PAYLOAD_SIZE
,
int32_t
ret
=
tscGetDataBlockFromList
(
pVnodeDataBlockHashList
,
pOneTableBlock
->
vgId
,
TSDB_PAYLOAD_SIZE
,
INSERT_HEAD_SIZE
,
0
,
&
pOneTableBlock
->
tableName
,
pOneTableBlock
->
pTableMeta
,
&
dataBuf
,
pVnodeDataBlockList
);
INSERT_HEAD_SIZE
,
0
,
&
pOneTableBlock
->
tableName
,
pOneTableBlock
->
pTableMeta
,
&
dataBuf
,
pVnodeDataBlockList
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p failed to prepare the data block buffer for merging table data, code:%d"
,
pSql
,
ret
);
tscError
(
"
0x%"
PRIx64
" failed to prepare the data block buffer for merging table data, code:%d"
,
pSql
->
self
,
ret
);
taosHashCleanup
(
pVnodeDataBlockHashList
);
taosHashCleanup
(
pVnodeDataBlockHashList
);
tscDestroyBlockArrayList
(
pVnodeDataBlockList
);
tscDestroyBlockArrayList
(
pVnodeDataBlockList
);
return
ret
;
return
ret
;
...
@@ -932,7 +932,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
...
@@ -932,7 +932,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
dataBuf
->
pData
=
tmp
;
dataBuf
->
pData
=
tmp
;
memset
(
dataBuf
->
pData
+
dataBuf
->
size
,
0
,
dataBuf
->
nAllocSize
-
dataBuf
->
size
);
memset
(
dataBuf
->
pData
+
dataBuf
->
size
,
0
,
dataBuf
->
nAllocSize
-
dataBuf
->
size
);
}
else
{
// failed to allocate memory, free already allocated memory and return error code
}
else
{
// failed to allocate memory, free already allocated memory and return error code
tscError
(
"
%p failed to allocate memory for merging submit block, size:%d"
,
pSql
,
dataBuf
->
nAllocSize
);
tscError
(
"
0x%"
PRIx64
" failed to allocate memory for merging submit block, size:%d"
,
pSql
->
self
,
dataBuf
->
nAllocSize
);
taosHashCleanup
(
pVnodeDataBlockHashList
);
taosHashCleanup
(
pVnodeDataBlockHashList
);
tscDestroyBlockArrayList
(
pVnodeDataBlockList
);
tscDestroyBlockArrayList
(
pVnodeDataBlockList
);
...
@@ -2115,7 +2115,7 @@ void registerSqlObj(SSqlObj* pSql) {
...
@@ -2115,7 +2115,7 @@ void registerSqlObj(SSqlObj* pSql) {
SSqlObj
*
createSimpleSubObj
(
SSqlObj
*
pSql
,
__async_cb_func_t
fp
,
void
*
param
,
int32_t
cmd
)
{
SSqlObj
*
createSimpleSubObj
(
SSqlObj
*
pSql
,
__async_cb_func_t
fp
,
void
*
param
,
int32_t
cmd
)
{
SSqlObj
*
pNew
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
SSqlObj
*
pNew
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pNew
==
NULL
)
{
if
(
pNew
==
NULL
)
{
tscError
(
"
%p new subquery failed, tableIndex:%d"
,
pSql
,
0
);
tscError
(
"
0x%"
PRIx64
" new subquery failed, tableIndex:%d"
,
pSql
->
self
,
0
);
return
NULL
;
return
NULL
;
}
}
...
@@ -2129,7 +2129,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
...
@@ -2129,7 +2129,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
int32_t
code
=
copyTagData
(
&
pNew
->
cmd
.
tagData
,
&
pSql
->
cmd
.
tagData
);
int32_t
code
=
copyTagData
(
&
pNew
->
cmd
.
tagData
,
&
pSql
->
cmd
.
tagData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p new subquery failed, unable to malloc tag data, tableIndex:%d"
,
pSql
,
0
);
tscError
(
"
0x%"
PRIx64
" new subquery failed, unable to malloc tag data, tableIndex:%d"
,
pSql
->
self
,
0
);
free
(
pNew
);
free
(
pNew
);
return
NULL
;
return
NULL
;
}
}
...
@@ -2206,7 +2206,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
...
@@ -2206,7 +2206,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
SSqlObj
*
pNew
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
SSqlObj
*
pNew
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pNew
==
NULL
)
{
if
(
pNew
==
NULL
)
{
tscError
(
"
%p new subquery failed, tableIndex:%d"
,
pSql
,
tableIndex
);
tscError
(
"
0x%"
PRIx64
" new subquery failed, tableIndex:%d"
,
pSql
->
self
,
tableIndex
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
...
@@ -2295,7 +2295,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
...
@@ -2295,7 +2295,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
}
}
if
(
tscAllocPayload
(
pnCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tscAllocPayload
(
pnCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
)
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"
%p new subquery failed, tableIndex:%d, vgroupIndex:%d"
,
pSql
,
tableIndex
,
pTableMetaInfo
->
vgroupIndex
);
tscError
(
"
0x%"
PRIx64
" new subquery failed, tableIndex:%d, vgroupIndex:%d"
,
pSql
->
self
,
tableIndex
,
pTableMetaInfo
->
vgroupIndex
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
goto
_error
;
}
}
...
@@ -2346,7 +2346,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
...
@@ -2346,7 +2346,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
// this case cannot be happened
// this case cannot be happened
if
(
pFinalInfo
->
pTableMeta
==
NULL
)
{
if
(
pFinalInfo
->
pTableMeta
==
NULL
)
{
tscError
(
"
%p new subquery failed since no tableMeta, name:%s"
,
pSql
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
));
tscError
(
"
0x%"
PRIx64
" new subquery failed since no tableMeta, name:%s"
,
pSql
->
self
,
tNameGetTableName
(
&
pTableMetaInfo
->
name
));
if
(
pPrevSql
!=
NULL
)
{
// pass the previous error to client
if
(
pPrevSql
!=
NULL
)
{
// pass the previous error to client
assert
(
pPrevSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
);
assert
(
pPrevSql
->
res
.
code
!=
TSDB_CODE_SUCCESS
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录