Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
574f9d4c
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
574f9d4c
编写于
11月 27, 2019
作者:
S
slguan
提交者:
GitHub
11月 27, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #786 from taosdata/feature/liaohj
[tbase-874]
上级
7570c7ff
f0b93abb
变更
13
展开全部
隐藏空白更改
内联
并排
Showing
13 changed file
with
443 addition
and
714 deletion
+443
-714
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+40
-47
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+3
-3
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+80
-121
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+256
-460
src/client/src/tscSchemaUtil.c
src/client/src/tscSchemaUtil.c
+1
-0
src/client/src/tscServer.c
src/client/src/tscServer.c
+2
-29
src/client/src/tscSql.c
src/client/src/tscSql.c
+18
-10
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+26
-2
src/inc/taosmsg.h
src/inc/taosmsg.h
+0
-1
src/system/detail/inc/vnode.h
src/system/detail/inc/vnode.h
+1
-1
src/system/detail/src/vnodeQueryImpl.c
src/system/detail/src/vnodeQueryImpl.c
+11
-29
src/system/detail/src/vnodeRead.c
src/system/detail/src/vnodeRead.c
+4
-7
src/system/detail/src/vnodeShell.c
src/system/detail/src/vnodeShell.c
+1
-4
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
574f9d4c
...
...
@@ -34,8 +34,8 @@ extern "C" {
#include "tglobalcfg.h"
#include "tlog.h"
#include "tscCache.h"
#include "tsdb.h"
#include "tscSQLParser.h"
#include "tsdb.h"
#include "tsqlfunction.h"
#include "tutil.h"
...
...
@@ -219,22 +219,22 @@ typedef struct STagCond {
}
STagCond
;
typedef
struct
SParamInfo
{
int32_t
idx
;
char
type
;
uint8_t
timePrec
;
short
bytes
;
int32_t
idx
;
char
type
;
uint8_t
timePrec
;
short
bytes
;
uint32_t
offset
;
}
SParamInfo
;
typedef
struct
STableDataBlocks
{
char
meterId
[
TSDB_METER_ID_LEN
];
int8_t
tsSource
;
bool
ordered
;
bool
ordered
;
int64_t
vgid
;
int64_t
prevTS
;
int32_t
numOfMeters
;
int32_t
numOfMeters
;
int32_t
rowSize
;
uint32_t
nAllocSize
;
...
...
@@ -245,9 +245,9 @@ typedef struct STableDataBlocks {
};
// for parameter ('?') binding
uint32_t
numOfAllocedParams
;
uint32_t
numOfParams
;
SParamInfo
*
params
;
uint32_t
numOfAllocedParams
;
uint32_t
numOfParams
;
SParamInfo
*
params
;
}
STableDataBlocks
;
typedef
struct
SDataBlockList
{
...
...
@@ -262,18 +262,17 @@ typedef struct SDataBlockList {
typedef
struct
{
SOrderVal
order
;
int
command
;
// TODO refactor
int
count
;
int16_t
isInsertFromFile
;
// load data from file or not
int
count
;
// TODO refactor
union
{
bool
existsCheck
;
int8_t
showType
;
bool
existsCheck
;
// check if the table exists
int8_t
showType
;
// show command type
int8_t
isInsertFromFile
;
// load data from file or not
};
bool
import
;
// import/insert type
char
msgType
;
uint16_t
type
;
uint16_t
type
;
// query type
char
intervalTimeUnit
;
int64_t
etime
,
stime
;
int64_t
nAggTimeInterval
;
// aggregation time interval
...
...
@@ -286,20 +285,20 @@ typedef struct {
*
* In such cases, allocate the memory dynamically, and need to free the memory
*/
uint32_t
allocSize
;
char
*
payload
;
int
payloadLen
;
short
numOfCols
;
uint32_t
allocSize
;
char
*
payload
;
int
payloadLen
;
short
numOfCols
;
SColumnBaseInfo
colList
;
SFieldInfo
fieldsInfo
;
SSqlExprInfo
exprsInfo
;
SLimitVal
limit
;
SLimitVal
slimit
;
int64_t
globalLimit
;
STagCond
tagCond
;
int16_t
vnodeIdx
;
// vnode index in pMetricMeta for metric query
int16_t
interpoType
;
// interpolate type
int16_t
numOfTables
;
SFieldInfo
fieldsInfo
;
SSqlExprInfo
exprsInfo
;
SLimitVal
limit
;
SLimitVal
slimit
;
int64_t
globalLimit
;
STagCond
tagCond
;
int16_t
vnodeIdx
;
// vnode index in pMetricMeta for metric query
int16_t
interpoType
;
// interpolate type
int16_t
numOfTables
;
// submit data blocks branched according to vnode
SDataBlockList
*
pDataBlocks
;
...
...
@@ -430,11 +429,11 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion);
void
tscInitMsgs
();
void
*
tscProcessMsgFromServer
(
char
*
msg
,
void
*
ahandle
,
void
*
thandle
);
int
tscProcessSql
(
SSqlObj
*
pSql
);
int
tscProcessSql
(
SSqlObj
*
pSql
);
void
tscAsyncInsertMultiVnodesProxy
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
int
tscRenewMeterMeta
(
SSqlObj
*
pSql
,
char
*
meterId
);
int
tscRenewMeterMeta
(
SSqlObj
*
pSql
,
char
*
meterId
);
void
tscQueueAsyncRes
(
SSqlObj
*
pSql
);
void
tscQueueAsyncError
(
void
(
*
fp
),
void
*
param
);
...
...
@@ -448,18 +447,12 @@ int taos_retrieve(TAOS_RES *res);
* before send query message to vnode
*/
int32_t
tscTansformSQLFunctionForMetricQuery
(
SSqlCmd
*
pCmd
);
void
tscRestoreSQLFunctionForMetricQuery
(
SSqlCmd
*
pCmd
);
/**
* release both metric/meter meta information
* @param pCmd SSqlCmd object that contains the metric/meter meta info
*/
void
tscClearSqlMetaInfo
(
SSqlCmd
*
pCmd
);
void
tscRestoreSQLFunctionForMetricQuery
(
SSqlCmd
*
pCmd
);
void
tscClearSqlMetaInfoForce
(
SSqlCmd
*
pCmd
);
int32_t
tscCreateResPointerInfo
(
SSqlCmd
*
pCmd
,
SSqlRes
*
pRes
);
void
tscDestroyResPointerInfo
(
SSqlRes
*
pRes
);
void
tscDestroyResPointerInfo
(
SSqlRes
*
pRes
);
void
tscFreeSqlCmdData
(
SSqlCmd
*
pCmd
);
...
...
@@ -479,12 +472,12 @@ void tscFreeSqlObj(SSqlObj *pObj);
void
tscCloseTscObj
(
STscObj
*
pObj
);
void
tscProcessMultiVnodesInsert
(
SSqlObj
*
pSql
);
void
tscProcessMultiVnodesInsertForFile
(
SSqlObj
*
pSql
);
void
tscKillMetricQuery
(
SSqlObj
*
pSql
);
void
tscInitResObjForLocalQuery
(
SSqlObj
*
pObj
,
int32_t
numOfRes
,
int32_t
rowLen
);
int32_t
tscBuildResultsForEmptyRetrieval
(
SSqlObj
*
pSql
);
bool
tscIsUpdateQuery
(
STscObj
*
pObj
);
void
tscProcessMultiVnodesInsert
(
SSqlObj
*
pSql
);
void
tscProcessMultiVnodesInsertForFile
(
SSqlObj
*
pSql
);
void
tscKillMetricQuery
(
SSqlObj
*
pSql
);
void
tscInitResObjForLocalQuery
(
SSqlObj
*
pObj
,
int32_t
numOfRes
,
int32_t
rowLen
);
bool
tscIsUpdateQuery
(
STscObj
*
pObj
);
int32_t
tscInvalidSQLErrMsg
(
char
*
msg
,
const
char
*
additionalInfo
,
const
char
*
sql
);
// transfer SSqlInfo to SqlCmd struct
int32_t
tscToSQLCmd
(
SSqlObj
*
pSql
,
struct
SSqlInfo
*
pInfo
);
...
...
src/client/src/tscAsync.c
浏览文件 @
574f9d4c
...
...
@@ -40,6 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
*/
static
void
tscProcessAsyncFetchRowsProxy
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
// TODO return the correct error code to client in tscQueueAsyncError
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
,
TAOS_RES
*
,
int
),
void
*
param
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
...
...
@@ -54,18 +55,17 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
tscError
(
"sql string too long"
);
tscQueueAsyncError
(
fp
,
param
);
return
;
}
}
taosNotePrintTsc
(
sqlstr
);
SSqlObj
*
pSql
=
(
SSqlObj
*
)
malloc
(
sizeof
(
SSqlObj
));
SSqlObj
*
pSql
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pSql
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
tscQueueAsyncError
(
fp
,
param
);
return
;
}
memset
(
pSql
,
0
,
sizeof
(
SSqlObj
));
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
...
...
src/client/src/tscParseInsert.c
浏览文件 @
574f9d4c
此差异已折叠。
点击以展开。
src/client/src/tscSQLParser.c
浏览文件 @
574f9d4c
此差异已折叠。
点击以展开。
src/client/src/tscSchemaUtil.c
浏览文件 @
574f9d4c
...
...
@@ -123,6 +123,7 @@ bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) {
return
memcmp
(
p1
,
p2
,
size
)
==
0
;
}
//todo refactor
static
FORCE_INLINE
char
*
skipSegments
(
char
*
input
,
char
delimiter
,
int32_t
num
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
while
(
*
input
!=
0
&&
*
input
++
!=
delimiter
)
{
...
...
src/client/src/tscServer.c
浏览文件 @
574f9d4c
...
...
@@ -1415,7 +1415,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql) {
pMsg
=
pStart
;
pShellMsg
=
(
SShellSubmitMsg
*
)
pMsg
;
pShellMsg
->
import
=
pSql
->
cmd
.
order
.
order
;
pShellMsg
->
import
=
pSql
->
cmd
.
import
;
pShellMsg
->
vnode
=
htons
(
pMeterMeta
->
vpeerDesc
[
pMeterMeta
->
index
].
vnode
);
pShellMsg
->
numOfSid
=
htonl
(
pSql
->
cmd
.
count
);
// number of meters to be inserted
...
...
@@ -3453,31 +3453,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
return
0
;
}
static
void
doDecompressPayload
(
SSqlCmd
*
pCmd
,
SSqlRes
*
pRes
,
int16_t
compressed
)
{
if
(
compressed
&&
pRes
->
numOfRows
>
0
)
{
SRetrieveMeterRsp
*
pRetrieve
=
(
SRetrieveMeterRsp
*
)
pRes
->
pRsp
;
int32_t
numOfTotalCols
=
pCmd
->
fieldsInfo
.
numOfOutputCols
+
pCmd
->
fieldsInfo
.
numOfHiddenCols
;
int32_t
rowSize
=
pCmd
->
fieldsInfo
.
pOffset
[
numOfTotalCols
-
1
]
+
pCmd
->
fieldsInfo
.
pFields
[
numOfTotalCols
-
1
].
bytes
;
// TODO handle the OOM problem
char
*
buf
=
malloc
(
rowSize
*
pRes
->
numOfRows
);
int32_t
payloadSize
=
pRes
->
rspLen
-
1
-
sizeof
(
SRetrieveMeterRsp
);
assert
(
payloadSize
>
0
);
int32_t
decompressedSize
=
tsDecompressString
(
pRetrieve
->
data
,
payloadSize
,
1
,
buf
,
rowSize
*
pRes
->
numOfRows
,
0
,
0
,
0
);
assert
(
decompressedSize
==
rowSize
*
pRes
->
numOfRows
);
pRes
->
pRsp
=
realloc
(
pRes
->
pRsp
,
pRes
->
rspLen
-
payloadSize
+
decompressedSize
);
memcpy
(
pRes
->
pRsp
+
sizeof
(
SRetrieveMeterRsp
),
buf
,
decompressedSize
);
free
(
buf
);
}
pRes
->
data
=
((
SRetrieveMeterRsp
*
)
pRes
->
pRsp
)
->
data
;
}
int
tscProcessRetrieveRspFromVnode
(
SSqlObj
*
pSql
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -3490,9 +3465,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes
->
offset
=
htobe64
(
pRetrieve
->
offset
);
pRes
->
useconds
=
htobe64
(
pRetrieve
->
useconds
);
pRetrieve
->
compress
=
htons
(
pRetrieve
->
compress
);
doDecompressPayload
(
pCmd
,
pRes
,
pRetrieve
->
compress
);
pRes
->
data
=
pRetrieve
->
data
;
tscSetResultPointer
(
pCmd
,
pRes
);
pRes
->
row
=
0
;
...
...
src/client/src/tscSql.c
浏览文件 @
574f9d4c
...
...
@@ -246,7 +246,12 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
tscDoQuery
(
pSql
);
}
tscTrace
(
"%p SQL result:%d, %s pObj:%p"
,
pSql
,
pRes
->
code
,
taos_errstr
(
pObj
),
pObj
);
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
)
{
tscTrace
(
"%p SQL result:%d, %s pObj:%p"
,
pSql
,
pRes
->
code
,
taos_errstr
(
pObj
),
pObj
);
}
else
{
tscError
(
"%p SQL result:%d, %s pObj:%p"
,
pSql
,
pRes
->
code
,
taos_errstr
(
pObj
),
pObj
);
}
if
(
pRes
->
code
!=
TSDB_CODE_SUCCESS
)
{
tscFreeSqlObjPartial
(
pSql
);
}
...
...
@@ -266,8 +271,9 @@ int taos_query(TAOS *taos, const char *sqlstr) {
size_t
sqlLen
=
strlen
(
sqlstr
);
if
(
sqlLen
>
TSDB_MAX_SQL_LEN
)
{
tscError
(
"%p sql too long"
,
pSql
);
pRes
->
code
=
TSDB_CODE_INVALID_SQL
;
pRes
->
code
=
tscInvalidSQLErrMsg
(
pSql
->
cmd
.
payload
,
"sql too long"
,
NULL
);
// set the additional error msg for invalid sql
tscError
(
"%p SQL result:%d, %s pObj:%p"
,
pSql
,
pRes
->
code
,
taos_errstr
(
taos
),
pObj
);
return
pRes
->
code
;
}
...
...
@@ -276,8 +282,9 @@ int taos_query(TAOS *taos, const char *sqlstr) {
void
*
sql
=
realloc
(
pSql
->
sqlstr
,
sqlLen
+
1
);
if
(
sql
==
NULL
)
{
pRes
->
code
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
tscError
(
"%p failed to malloc sql string buffer"
,
pSql
);
tscTrace
(
"%p SQL result:%d, %s pObj:%p"
,
pSql
,
pRes
->
code
,
taos_errstr
(
taos
),
pObj
);
tscError
(
"%p failed to malloc sql string buffer, reason:%s"
,
pSql
,
strerror
(
errno
));
tscError
(
"%p SQL result:%d, %s pObj:%p"
,
pSql
,
pRes
->
code
,
taos_errstr
(
taos
),
pObj
);
return
pRes
->
code
;
}
...
...
@@ -777,9 +784,9 @@ int taos_errno(TAOS *taos) {
}
char
*
taos_errstr
(
TAOS
*
taos
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
u
nsigned
char
code
;
char
temp
[
256
]
=
{
0
};
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
u
int8_t
code
;
//
char temp[256] = {0};
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
return
tsError
[
globalCode
];
...
...
@@ -788,9 +795,10 @@ char *taos_errstr(TAOS *taos) {
else
code
=
pObj
->
pSql
->
res
.
code
;
// for invalid sql, additional information is attached to explain why the sql is invalid
if
(
code
==
TSDB_CODE_INVALID_SQL
)
{
snprintf
(
temp
,
tListLen
(
temp
),
"invalid SQL: %s"
,
pObj
->
pSql
->
cmd
.
payload
);
strcpy
(
pObj
->
pSql
->
cmd
.
payload
,
temp
);
//
snprintf(temp, tListLen(temp), "invalid SQL: %s", pObj->pSql->cmd.payload);
//
strcpy(pObj->pSql->cmd.payload, temp);
return
pObj
->
pSql
->
cmd
.
payload
;
}
else
{
return
tsError
[
code
];
...
...
src/client/src/tscUtil.c
浏览文件 @
574f9d4c
...
...
@@ -1294,8 +1294,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
// re-build the whole name string
if
(
pStr
[
firstPartLen
]
==
TS_PATH_DELIMITER
[
0
])
{
// first part do not have quote
// do nothing
// first part do not have quote do nothing
}
else
{
pStr
[
firstPartLen
]
=
TS_PATH_DELIMITER
[
0
];
memmove
(
&
pStr
[
firstPartLen
+
1
],
pToken
->
z
,
pToken
->
n
);
...
...
@@ -1842,5 +1841,30 @@ bool tscIsUpdateQuery(STscObj* pObj) {
SSqlCmd
*
pCmd
=
&
pObj
->
pSql
->
cmd
;
return
((
pCmd
->
command
>=
TSDB_SQL_INSERT
&&
pCmd
->
command
<=
TSDB_SQL_DROP_DNODE
)
||
TSDB_SQL_USE_DB
==
pCmd
->
command
)
?
1
:
0
;
}
int32_t
tscInvalidSQLErrMsg
(
char
*
msg
,
const
char
*
additionalInfo
,
const
char
*
sql
)
{
const
char
*
msgFormat1
=
"invalid SQL: %s"
;
const
char
*
msgFormat2
=
"invalid SQL: syntax error near
\"
%s
\"
(%s)"
;
const
char
*
msgFormat3
=
"invalid SQL: syntax error near
\"
%s
\"
"
;
const
int32_t
BACKWARD_CHAR_STEP
=
0
;
if
(
sql
==
NULL
)
{
assert
(
additionalInfo
!=
NULL
);
sprintf
(
msg
,
msgFormat1
,
additionalInfo
);
return
TSDB_CODE_INVALID_SQL
;
}
char
buf
[
64
]
=
{
0
};
// only extract part of sql string
strncpy
(
buf
,
(
sql
-
BACKWARD_CHAR_STEP
),
tListLen
(
buf
)
-
1
);
if
(
additionalInfo
!=
NULL
)
{
sprintf
(
msg
,
msgFormat2
,
buf
,
additionalInfo
);
}
else
{
sprintf
(
msg
,
msgFormat3
,
buf
);
// no additional information for invalid sql error
}
return
TSDB_CODE_INVALID_SQL
;
}
src/inc/taosmsg.h
浏览文件 @
574f9d4c
...
...
@@ -568,7 +568,6 @@ typedef struct {
typedef
struct
{
int32_t
numOfRows
;
int16_t
precision
;
int16_t
compress
;
int64_t
offset
;
// updated offset value for multi-vnode projection query
int64_t
useconds
;
char
data
[];
...
...
src/system/detail/inc/vnode.h
浏览文件 @
574f9d4c
...
...
@@ -353,7 +353,7 @@ bool vnodeIsValidVnodeCfg(SVnodeCfg *pCfg);
int32_t
vnodeGetResultSize
(
void
*
handle
,
int32_t
*
numOfRows
);
int32_t
vnodeCopyQueryResultToMsg
(
void
*
handle
,
char
*
data
,
int32_t
numOfRows
,
int32_t
*
size
);
int32_t
vnodeCopyQueryResultToMsg
(
void
*
handle
,
char
*
data
,
int32_t
numOfRows
);
int64_t
vnodeGetOffsetVal
(
void
*
thandle
);
...
...
src/system/detail/src/vnodeQueryImpl.c
浏览文件 @
574f9d4c
...
...
@@ -6943,36 +6943,18 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p
return
numOfRes
;
}
static
void
doCopyQueryResultToMsg
(
SQInfo
*
pQInfo
,
int32_t
numOfRows
,
char
*
data
,
int32_t
*
size
)
{
static
void
doCopyQueryResultToMsg
(
SQInfo
*
pQInfo
,
int32_t
numOfRows
,
char
*
data
)
{
SMeterObj
*
pObj
=
pQInfo
->
pObj
;
SQuery
*
pQuery
=
&
pQInfo
->
query
;
int
tnumOfRows
=
vnodeList
[
pObj
->
vnode
].
cfg
.
rowsInFileBlock
;
int32_t
dataSize
=
pQInfo
->
query
.
rowSize
*
numOfRows
;
if
(
dataSize
>=
tsCompressMsgSize
&&
tsCompressMsgSize
>
0
)
{
char
*
compBuf
=
malloc
((
size_t
)
dataSize
);
// for metric query, bufIndex always be 0.
char
*
d
=
compBuf
;
for
(
int32_t
col
=
0
;
col
<
pQuery
->
numOfOutputCols
;
++
col
)
{
// pQInfo->bufIndex == 0
int32_t
bytes
=
pQuery
->
pSelectExpr
[
col
].
resBytes
;
memmove
(
d
,
pQuery
->
sdata
[
col
]
->
data
+
bytes
*
tnumOfRows
*
pQInfo
->
bufIndex
,
bytes
*
numOfRows
);
d
+=
bytes
*
numOfRows
;
}
*
size
=
tsCompressString
(
compBuf
,
dataSize
,
1
,
data
,
dataSize
+
EXTRA_BYTES
,
0
,
0
,
0
);
dTrace
(
"QInfo:%p compress rsp msg, before:%d, after:%d"
,
pQInfo
,
dataSize
,
*
size
);
free
(
compBuf
);
}
else
{
// for metric query, bufIndex always be 0.
for
(
int32_t
col
=
0
;
col
<
pQuery
->
numOfOutputCols
;
++
col
)
{
// pQInfo->bufIndex == 0
int32_t
bytes
=
pQuery
->
pSelectExpr
[
col
].
resBytes
;
int
tnumOfRows
=
vnodeList
[
pObj
->
vnode
].
cfg
.
rowsInFileBlock
;
// for metric query, bufIndex always be 0.
for
(
int32_t
col
=
0
;
col
<
pQuery
->
numOfOutputCols
;
++
col
)
{
// pQInfo->bufIndex == 0
int32_t
bytes
=
pQuery
->
pSelectExpr
[
col
].
resBytes
;
memmove
(
data
,
pQuery
->
sdata
[
col
]
->
data
+
bytes
*
tnumOfRows
*
pQInfo
->
bufIndex
,
bytes
*
numOfRows
);
data
+=
bytes
*
numOfRows
;
}
memmove
(
data
,
pQuery
->
sdata
[
col
]
->
data
+
bytes
*
tnumOfRows
*
pQInfo
->
bufIndex
,
bytes
*
numOfRows
);
data
+=
bytes
*
numOfRows
;
}
}
...
...
@@ -6987,7 +6969,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
* @param numOfRows the number of rows that are not returned in current retrieve
* @return
*/
int32_t
vnodeCopyQueryResultToMsg
(
void
*
handle
,
char
*
data
,
int32_t
numOfRows
,
int32_t
*
size
)
{
int32_t
vnodeCopyQueryResultToMsg
(
void
*
handle
,
char
*
data
,
int32_t
numOfRows
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
handle
;
SQuery
*
pQuery
=
&
pQInfo
->
query
;
...
...
@@ -7000,7 +6982,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i
// make sure file exist
if
(
VALIDFD
(
fd
))
{
size_t
s
=
lseek
(
fd
,
0
,
SEEK_END
);
dTrace
(
"QInfo:%p ts comp data return, file:%s, size:%l
d"
,
pQInfo
,
pQuery
->
sdata
[
0
]
->
data
,
size
);
dTrace
(
"QInfo:%p ts comp data return, file:%s, size:%l
ld"
,
pQInfo
,
pQuery
->
sdata
[
0
]
->
data
,
s
);
lseek
(
fd
,
0
,
SEEK_SET
);
read
(
fd
,
data
,
s
);
...
...
@@ -7012,7 +6994,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i
pQuery
->
sdata
[
0
]
->
data
,
strerror
(
errno
));
}
}
else
{
doCopyQueryResultToMsg
(
pQInfo
,
numOfRows
,
data
,
size
);
doCopyQueryResultToMsg
(
pQInfo
,
numOfRows
,
data
);
}
return
numOfRows
;
...
...
src/system/detail/src/vnodeRead.c
浏览文件 @
574f9d4c
...
...
@@ -850,7 +850,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
// the remained number of retrieved rows, not the interpolated result
int
numOfRows
=
pQInfo
->
pointsRead
-
pQInfo
->
pointsReturned
;
int32_t
numOfFinal
=
vnodeCopyQueryResultToMsg
(
pQInfo
,
data
,
numOfRows
,
size
);
int32_t
numOfFinal
=
vnodeCopyQueryResultToMsg
(
pQInfo
,
data
,
numOfRows
);
pQInfo
->
pointsReturned
+=
numOfFinal
;
dTrace
(
"QInfo:%p %d are returned, totalReturned:%d totalRead:%d"
,
pQInfo
,
numOfFinal
,
pQInfo
->
pointsReturned
,
...
...
@@ -862,12 +862,9 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
uint64_t
oldSignature
=
TSDB_QINFO_SET_QUERY_FLAG
(
pQInfo
);
/*
* If SQInfo has been released, the value of signature cannot be equalled to
* the address of pQInfo, since in release function, the original value has
* been
* destroyed. However, this memory area may be reused by another function.
* It may be 0 or any value, but it is rarely still be equalled to the address
* of SQInfo.
* If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo,
* since in release function, the original value has been destroyed. However, this memory area may be reused
* by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo.
*/
if
(
oldSignature
==
0
||
oldSignature
!=
(
uint64_t
)
pQInfo
)
{
dTrace
(
"%p freed or killed, old sig:%p abort query"
,
pQInfo
,
oldSignature
);
...
...
src/system/detail/src/vnodeShell.c
浏览文件 @
574f9d4c
...
...
@@ -452,11 +452,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
pMsg
=
pRsp
->
data
;
if
(
numOfRows
>
0
&&
code
==
TSDB_CODE_SUCCESS
)
{
int32_t
oldSize
=
size
;
vnodeSaveQueryResult
((
void
*
)(
pRetrieve
->
qhandle
),
pRsp
->
data
,
&
size
);
if
(
oldSize
>
size
)
{
pRsp
->
compress
=
htons
(
1
);
// denote that the response msg is compressed
}
}
pMsg
+=
size
;
...
...
@@ -573,6 +569,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int
sversion
=
htonl
(
pBlocks
->
sversion
);
if
(
pSubmit
->
import
)
{
dTrace
(
"start to import data"
);
code
=
vnodeImportPoints
(
pMeterObj
,
(
char
*
)
&
(
pBlocks
->
numOfRows
),
subMsgLen
,
TSDB_DATA_SOURCE_SHELL
,
pObj
,
sversion
,
&
numOfPoints
,
now
);
}
else
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录