Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f159fdb3
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
f159fdb3
编写于
7月 03, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
7月 03, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2543 from taosdata/feature/query
Feature/query
上级
559dbcd0
5a7d5123
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
285 addition
and
204 deletion
+285
-204
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+0
-2
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+3
-3
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+22
-26
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+25
-8
src/client/src/tscServer.c
src/client/src/tscServer.c
+1
-1
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+0
-25
src/common/inc/tcmdtype.h
src/common/inc/tcmdtype.h
+3
-3
src/common/inc/tdataformat.h
src/common/inc/tdataformat.h
+3
-3
src/common/inc/tname.h
src/common/inc/tname.h
+2
-0
src/common/src/sqlcmdstr.c
src/common/src/sqlcmdstr.c
+1
-1
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+4
-4
src/common/src/tname.c
src/common/src/tname.c
+26
-1
src/inc/query.h
src/inc/query.h
+1
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+2
-2
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+6
-13
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+81
-41
src/query/src/qast.c
src/query/src/qast.c
+1
-3
src/query/src/qparserImpl.c
src/query/src/qparserImpl.c
+3
-3
src/query/src/sql.c
src/query/src/sql.c
+5
-5
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+45
-11
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+14
-17
src/util/src/tcache.c
src/util/src/tcache.c
+0
-1
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+3
-2
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+34
-28
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
f159fdb3
...
...
@@ -176,8 +176,6 @@ SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
SArray
*
tscColumnListClone
(
const
SArray
*
src
,
int16_t
tableIndex
);
void
tscColumnListDestroy
(
SArray
*
pColList
);
SColumnFilterInfo
*
tscFilterInfoClone
(
const
SColumnFilterInfo
*
src
,
int32_t
numOfFilters
);
int32_t
tscValidateName
(
SSQLToken
*
pToken
);
void
tscIncStreamExecutionCount
(
void
*
pStream
);
...
...
src/client/inc/tsclient.h
浏览文件 @
f159fdb3
...
...
@@ -32,8 +32,8 @@ extern "C" {
#include "qExecutor.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "tcmdtype.h"
// forward declaration
struct
SSqlInfo
;
...
...
@@ -395,7 +395,6 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void
*
param
,
void
**
taos
);
void
waitForQueryRsp
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
;
int
doAsyncParseSql
(
SSqlObj
*
pSql
);
void
doAsyncQuery
(
STscObj
*
pObj
,
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
const
char
*
sqlstr
,
size_t
sqlLen
);
void
tscProcessMultiVnodesImportFromFile
(
SSqlObj
*
pSql
);
...
...
@@ -403,13 +402,14 @@ void tscKillSTableQuery(SSqlObj *pSql);
void
tscInitResObjForLocalQuery
(
SSqlObj
*
pObj
,
int32_t
numOfRes
,
int32_t
rowLen
);
bool
tscIsUpdateQuery
(
SSqlObj
*
pSql
);
bool
tscHasReachLimitation
(
SQueryInfo
*
pQueryInfo
,
SSqlRes
*
pRes
);
// todo remove this function.
bool
tscResultsetFetchCompleted
(
TAOS_RES
*
result
);
char
*
tscGetErrorMsgPayload
(
SSqlCmd
*
pCmd
);
int32_t
tscInvalidSQLErrMsg
(
char
*
msg
,
const
char
*
additionalInfo
,
const
char
*
sql
);
void
tscQueueAsyncFreeResult
(
SSqlObj
*
pSql
);
int32_t
tscToSQLCmd
(
SSqlObj
*
pSql
,
struct
SSqlInfo
*
pInfo
);
void
tscGetResultColumnChr
(
SSqlRes
*
pRes
,
SFieldInfo
*
pFieldInfo
,
int32_t
column
);
...
...
src/client/src/tscAsync.c
浏览文件 @
f159fdb3
...
...
@@ -213,27 +213,34 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
// handle the sub queries of join query
if
(
pCmd
->
command
==
TSDB_SQL_TABLE_JOIN_RETRIEVE
)
{
tscFetchDatablockFromSubquery
(
pSql
);
}
else
if
(
pRes
->
completed
&&
pCmd
->
command
==
TSDB_SQL_FETCH
)
{
if
(
hasMoreVnodesToTry
(
pSql
))
{
// sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode
(
pSql
,
tscAsyncQueryRowsForNextVnode
);
return
;
}
else
{
/*
}
else
if
(
pRes
->
completed
)
{
if
(
pCmd
->
command
==
TSDB_SQL_FETCH
)
{
if
(
hasMoreVnodesToTry
(
pSql
))
{
// sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode
(
pSql
,
tscAsyncQueryRowsForNextVnode
);
return
;
}
else
{
/*
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
*/
if
(
pCmd
->
clauseIndex
<
pCmd
->
numOfClause
-
1
)
{
tscTryQueryNextClause
(
pSql
,
tscAsyncQueryRowsForNextVnode
);
return
;
}
/*
*/
if
(
pCmd
->
clauseIndex
<
pCmd
->
numOfClause
-
1
)
{
tscTryQueryNextClause
(
pSql
,
tscAsyncQueryRowsForNextVnode
);
return
;
}
/*
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/
*/
(
*
pSql
->
fetchFp
)(
param
,
pSql
,
0
);
}
return
;
}
else
if
(
pCmd
->
command
==
TSDB_SQL_RETRIEVE
)
{
// in case of show command, return no data
(
*
pSql
->
fetchFp
)(
param
,
pSql
,
0
);
}
else
{
assert
(
0
);
}
return
;
}
else
{
// current query is not completed, continue retrieve from node
if
(
pCmd
->
command
!=
TSDB_SQL_RETRIEVE_LOCALMERGE
&&
pCmd
->
command
<
TSDB_SQL_LOCAL
)
{
pCmd
->
command
=
(
pCmd
->
command
>
TSDB_SQL_MGMT
)
?
TSDB_SQL_RETRIEVE
:
TSDB_SQL_FETCH
;
...
...
@@ -405,17 +412,6 @@ void tscProcessAsyncFree(SSchedMsg *pMsg) {
taos_free_result
(
pSql
);
}
void
tscQueueAsyncFreeResult
(
SSqlObj
*
pSql
)
{
tscDebug
(
"%p sqlObj put in queue to async free"
,
pSql
);
SSchedMsg
schedMsg
=
{
0
};
schedMsg
.
fp
=
tscProcessAsyncFree
;
schedMsg
.
ahandle
=
pSql
;
schedMsg
.
thandle
=
(
void
*
)
1
;
schedMsg
.
msg
=
NULL
;
taosScheduleTask
(
tscQhandle
,
&
schedMsg
);
}
int
tscSendMsgToServer
(
SSqlObj
*
pSql
);
void
tscTableMetaCallBack
(
void
*
param
,
TAOS_RES
*
res
,
int
code
)
{
...
...
src/client/src/tscSQLParser.c
浏览文件 @
f159fdb3
...
...
@@ -4452,6 +4452,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
tVariantList
*
pVarList
=
pAlterSQL
->
varList
;
tVariant
*
pTagName
=
&
pVarList
->
a
[
0
].
pVar
;
int16_t
numOfTags
=
tscGetNumOfTags
(
pTableMeta
);
SColumnIndex
columnIndex
=
COLUMN_INDEX_INITIALIZER
;
SSQLToken
name
=
{.
type
=
TK_STRING
,
.
z
=
pTagName
->
pz
,
.
n
=
pTagName
->
nLen
};
...
...
@@ -4475,8 +4476,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
(
pVarList
->
a
[
1
].
pVar
.
nLen
+
VARSTR_HEADER_SIZE
)
>
pTagsSchema
->
bytes
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg14
);
}
int32_t
size
=
sizeof
(
SUpdateTableTagValMsg
)
+
pTagsSchema
->
bytes
+
TSDB_EXTRA_PAYLOAD_SIZE
;
int32_t
schemaLen
=
sizeof
(
STColumn
)
*
numOfTags
;
int32_t
size
=
sizeof
(
SUpdateTableTagValMsg
)
+
pTagsSchema
->
bytes
+
schemaLen
+
TSDB_EXTRA_PAYLOAD_SIZE
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
size
))
{
tscError
(
"%p failed to malloc for alter table msg"
,
pSql
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -4487,22 +4490,36 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pUpdateMsg
->
tid
=
htonl
(
pTableMeta
->
sid
);
pUpdateMsg
->
uid
=
htobe64
(
pTableMeta
->
uid
);
pUpdateMsg
->
colId
=
htons
(
pTagsSchema
->
colId
);
pUpdateMsg
->
type
=
htons
(
pTagsSchema
->
type
);
pUpdateMsg
->
bytes
=
htons
(
pTagsSchema
->
bytes
);
pUpdateMsg
->
tversion
=
htons
(
pTableMeta
->
tversion
);
tVariantDump
(
&
pVarList
->
a
[
1
].
pVar
,
pUpdateMsg
->
data
,
pTagsSchema
->
type
,
true
);
pUpdateMsg
->
numOfTags
=
htons
(
numOfTags
);
pUpdateMsg
->
schemaLen
=
htonl
(
schemaLen
);
// the schema is located after the msg body, then followed by true tag value
char
*
d
=
pUpdateMsg
->
data
;
SSchema
*
pTagCols
=
tscGetTableTagSchema
(
pTableMeta
);
for
(
int
i
=
0
;
i
<
numOfTags
;
++
i
)
{
STColumn
*
pCol
=
(
STColumn
*
)
d
;
pCol
->
colId
=
htons
(
pTagCols
[
i
].
colId
);
pCol
->
bytes
=
htons
(
pTagCols
[
i
].
bytes
);
pCol
->
type
=
pTagCols
[
i
].
type
;
pCol
->
offset
=
0
;
d
+=
sizeof
(
STColumn
);
}
// copy the tag value to msg body
tVariantDump
(
&
pVarList
->
a
[
1
].
pVar
,
pUpdateMsg
->
data
+
schemaLen
,
pTagsSchema
->
type
,
true
);
int32_t
len
=
0
;
if
(
pTagsSchema
->
type
!=
TSDB_DATA_TYPE_BINARY
&&
pTagsSchema
->
type
!=
TSDB_DATA_TYPE_NCHAR
)
{
len
=
tDataTypeDesc
[
pTagsSchema
->
type
].
nSize
;
}
else
{
len
=
varDataTLen
(
pUpdateMsg
->
data
);
len
=
varDataTLen
(
pUpdateMsg
->
data
+
schemaLen
);
}
pUpdateMsg
->
tagValLen
=
htonl
(
len
);
// length may be changed after dump data
int32_t
total
=
sizeof
(
SUpdateTableTagValMsg
)
+
len
;
int32_t
total
=
sizeof
(
SUpdateTableTagValMsg
)
+
len
+
schemaLen
;
pUpdateMsg
->
head
.
contLen
=
htonl
(
total
);
}
else
if
(
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_ADD_COLUMN
)
{
...
...
src/client/src/tscServer.c
浏览文件 @
f159fdb3
...
...
@@ -14,8 +14,8 @@
*/
#include "os.h"
#include "qsqltype.h"
#include "tcache.h"
#include "tcmdtype.h"
#include "trpc.h"
#include "tscLocalMerge.h"
#include "tscLog.h"
...
...
src/client/src/tscUtil.c
浏览文件 @
f159fdb3
...
...
@@ -1115,31 +1115,6 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
return
taosArrayGetP
(
pColumnList
,
i
);
}
SColumnFilterInfo
*
tscFilterInfoClone
(
const
SColumnFilterInfo
*
src
,
int32_t
numOfFilters
)
{
if
(
numOfFilters
==
0
)
{
assert
(
src
==
NULL
);
return
NULL
;
}
SColumnFilterInfo
*
pFilter
=
calloc
(
1
,
numOfFilters
*
sizeof
(
SColumnFilterInfo
));
memcpy
(
pFilter
,
src
,
sizeof
(
SColumnFilterInfo
)
*
numOfFilters
);
for
(
int32_t
j
=
0
;
j
<
numOfFilters
;
++
j
)
{
if
(
pFilter
[
j
].
filterstr
)
{
size_t
len
=
(
size_t
)
pFilter
[
j
].
len
+
1
*
TSDB_NCHAR_SIZE
;
pFilter
[
j
].
pz
=
(
int64_t
)
calloc
(
1
,
len
);
memcpy
((
char
*
)
pFilter
[
j
].
pz
,
(
char
*
)
src
[
j
].
pz
,
(
size_t
)
len
);
}
}
assert
(
src
->
filterstr
==
0
||
src
->
filterstr
==
1
);
assert
(
!
(
src
->
lowerRelOptr
==
TSDB_RELATION_INVALID
&&
src
->
upperRelOptr
==
TSDB_RELATION_INVALID
));
return
pFilter
;
}
static
void
destroyFilterInfo
(
SColumnFilterInfo
*
pFilterInfo
,
int32_t
numOfFilters
)
{
for
(
int32_t
i
=
0
;
i
<
numOfFilters
;
++
i
)
{
if
(
pFilterInfo
[
i
].
filterstr
)
{
...
...
src/common/inc/
qsql
type.h
→
src/common/inc/
tcmd
type.h
浏览文件 @
f159fdb3
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_
QSQLCMD
_H
#define TDENGINE_
QSQLCMD
_H
#ifndef TDENGINE_
TSQLMSGTYPE
_H
#define TDENGINE_
TSQLMSGTYPE
_H
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -109,4 +109,4 @@ extern char *sqlCmd[];
}
#endif
#endif // TDENGINE_
QSQLCMD
_H
#endif // TDENGINE_
TSQLMSGTYPE
_H
src/common/inc/tdataformat.h
浏览文件 @
f159fdb3
...
...
@@ -50,8 +50,8 @@ extern "C" {
typedef
struct
{
int8_t
type
;
// Column type
int16_t
colId
;
// column ID
int
32
_t
bytes
;
// column bytes
int
32
_t
offset
;
// point offset in SDataRow after the header part
int
16
_t
bytes
;
// column bytes
int
16
_t
offset
;
// point offset in SDataRow after the header part
}
STColumn
;
#define colType(col) ((col)->type)
...
...
@@ -116,7 +116,7 @@ typedef struct {
int
tdInitTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
,
int32_t
version
);
void
tdDestroyTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
);
void
tdResetTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
,
int32_t
version
);
int
tdAddColToSchema
(
STSchemaBuilder
*
pBuilder
,
int8_t
type
,
int16_t
colId
,
int
32
_t
bytes
);
int
tdAddColToSchema
(
STSchemaBuilder
*
pBuilder
,
int8_t
type
,
int16_t
colId
,
int
16
_t
bytes
);
STSchema
*
tdGetSchemaFromBuilder
(
STSchemaBuilder
*
pBuilder
);
// ----------------- Data row structure
...
...
src/common/inc/tname.h
浏览文件 @
f159fdb3
...
...
@@ -27,4 +27,6 @@ SSchema tGetTableNameColumnSchema();
bool
tscValidateTableNameLength
(
size_t
len
);
SColumnFilterInfo
*
tscFilterInfoClone
(
const
SColumnFilterInfo
*
src
,
int32_t
numOfFilters
);
#endif // TDENGINE_NAME_H
src/common/src/sqlcmdstr.c
浏览文件 @
f159fdb3
...
...
@@ -15,4 +15,4 @@
#define TSDB_SQL_C
#include "
qsql
type.h"
#include "
tcmd
type.h"
src/common/src/tdataformat.c
浏览文件 @
f159fdb3
...
...
@@ -43,7 +43,7 @@ int tdEncodeSchema(void **buf, STSchema *pSchema) {
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
i
);
tlen
+=
taosEncodeFixedI8
(
buf
,
colType
(
pCol
));
tlen
+=
taosEncodeFixedI16
(
buf
,
colColId
(
pCol
));
tlen
+=
taosEncodeFixedI
32
(
buf
,
colBytes
(
pCol
));
tlen
+=
taosEncodeFixedI
16
(
buf
,
colBytes
(
pCol
));
}
return
tlen
;
...
...
@@ -65,10 +65,10 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
for
(
int
i
=
0
;
i
<
numOfCols
;
i
++
)
{
int8_t
type
=
0
;
int16_t
colId
=
0
;
int
32
_t
bytes
=
0
;
int
16
_t
bytes
=
0
;
buf
=
taosDecodeFixedI8
(
buf
,
&
type
);
buf
=
taosDecodeFixedI16
(
buf
,
&
colId
);
buf
=
taosDecodeFixedI
32
(
buf
,
&
bytes
);
buf
=
taosDecodeFixedI
16
(
buf
,
&
bytes
);
if
(
tdAddColToSchema
(
&
schemaBuilder
,
type
,
colId
,
bytes
)
<
0
)
{
tdDestroyTSchemaBuilder
(
&
schemaBuilder
);
return
NULL
;
...
...
@@ -105,7 +105,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
pBuilder
->
version
=
version
;
}
int
tdAddColToSchema
(
STSchemaBuilder
*
pBuilder
,
int8_t
type
,
int16_t
colId
,
int
32
_t
bytes
)
{
int
tdAddColToSchema
(
STSchemaBuilder
*
pBuilder
,
int8_t
type
,
int16_t
colId
,
int
16
_t
bytes
)
{
if
(
!
isValidDataType
(
type
))
return
-
1
;
if
(
pBuilder
->
nCols
>=
pBuilder
->
tCols
)
{
...
...
src/common/src/tname.c
浏览文件 @
f159fdb3
...
...
@@ -49,4 +49,29 @@ SSchema tGetTableNameColumnSchema() {
bool
tscValidateTableNameLength
(
size_t
len
)
{
return
len
<
TSDB_TABLE_NAME_LEN
;
}
\ No newline at end of file
}
SColumnFilterInfo
*
tscFilterInfoClone
(
const
SColumnFilterInfo
*
src
,
int32_t
numOfFilters
)
{
if
(
numOfFilters
==
0
)
{
assert
(
src
==
NULL
);
return
NULL
;
}
SColumnFilterInfo
*
pFilter
=
calloc
(
1
,
numOfFilters
*
sizeof
(
SColumnFilterInfo
));
memcpy
(
pFilter
,
src
,
sizeof
(
SColumnFilterInfo
)
*
numOfFilters
);
for
(
int32_t
j
=
0
;
j
<
numOfFilters
;
++
j
)
{
if
(
pFilter
[
j
].
filterstr
)
{
size_t
len
=
(
size_t
)
pFilter
[
j
].
len
+
1
*
TSDB_NCHAR_SIZE
;
pFilter
[
j
].
pz
=
(
int64_t
)
calloc
(
1
,
len
);
memcpy
((
char
*
)
pFilter
[
j
].
pz
,
(
char
*
)
src
[
j
].
pz
,
(
size_t
)
len
);
}
}
assert
(
src
->
filterstr
==
0
||
src
->
filterstr
==
1
);
assert
(
!
(
src
->
lowerRelOptr
==
TSDB_RELATION_INVALID
&&
src
->
upperRelOptr
==
TSDB_RELATION_INVALID
));
return
pFilter
;
}
src/inc/query.h
浏览文件 @
f159fdb3
...
...
@@ -44,7 +44,7 @@ void qDestroyQueryInfo(qinfo_t qinfo);
* @param qinfo
* @return
*/
void
qTableQuery
(
qinfo_t
qinfo
,
void
(
*
fp
)(
void
*
),
void
*
param
);
void
qTableQuery
(
qinfo_t
qinfo
);
/**
* Retrieve the produced results information, if current query is not paused or completed,
...
...
src/inc/taosmsg.h
浏览文件 @
f159fdb3
...
...
@@ -285,9 +285,9 @@ typedef struct {
int32_t
tid
;
int16_t
tversion
;
int16_t
colId
;
int16_t
type
;
int16_t
bytes
;
int32_t
tagValLen
;
int16_t
numOfTags
;
int32_t
schemaLen
;
char
data
[];
}
SUpdateTableTagValMsg
;
...
...
src/query/inc/qExecutor.h
浏览文件 @
f159fdb3
...
...
@@ -95,16 +95,13 @@ typedef struct SSingleColumnFilterInfo {
}
SSingleColumnFilterInfo
;
typedef
struct
STableQueryInfo
{
// todo merge with the STableQueryInfo struct
int32_t
tableIndex
;
int32_t
groupIndex
;
// group id in table list
TSKEY
lastKey
;
int32_t
numOfRes
;
int32_t
groupIndex
;
// group id in table list
int16_t
queryRangeSet
;
// denote if the query range is set, only available for interval query
int64_t
tag
;
STimeWindow
win
;
STSCursor
cur
;
void
*
pTable
;
// for retrieve the page id list
void
*
pTable
;
// for retrieve the page id list
SWindowResInfo
windowResInfo
;
}
STableQueryInfo
;
...
...
@@ -127,11 +124,6 @@ typedef struct SQueryCostInfo {
uint64_t
computTime
;
}
SQueryCostInfo
;
//typedef struct SGroupItem {
// void *pTable;
// STableQueryInfo *info;
//} SGroupItem;
typedef
struct
SQuery
{
int16_t
numOfCols
;
int16_t
numOfTags
;
...
...
@@ -173,12 +165,12 @@ typedef struct SQueryRuntimeEnv {
STSBuf
*
pTSBuf
;
STSCursor
cur
;
SQueryCostInfo
summary
;
bool
stableQuery
;
// super table query or not
void
*
pQueryHandle
;
void
*
pSecQueryHandle
;
// another thread for
SDiskbasedResultBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
bool
stableQuery
;
// super table query or not
bool
topBotQuery
;
// false
int32_t
prevGroupId
;
// previous executed group id
SDiskbasedResultBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
}
SQueryRuntimeEnv
;
typedef
struct
SQInfo
{
...
...
@@ -205,7 +197,8 @@ typedef struct SQInfo {
*/
int32_t
tableIndex
;
int32_t
numOfGroupResultPages
;
_qinfo_free_fn_t
fn
;
_qinfo_free_fn_t
freeFn
;
jmp_buf
env
;
}
SQInfo
;
#endif // TDENGINE_QUERYEXECUTOR_H
src/query/src/qExecutor.c
浏览文件 @
f159fdb3
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "qfill.h"
#include "hash.h"
...
...
@@ -22,9 +23,8 @@
#include "qresultBuf.h"
#include "query.h"
#include "queryLog.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "
tscUtil.h" // todo move the function to common module
#include "
exception.h"
#include "tscompression.h"
#include "ttime.h"
...
...
@@ -87,6 +87,17 @@ typedef struct {
STSCursor
cur
;
}
SQueryStatusInfo
;
static
UNUSED_FUNC
void
*
u_malloc
(
size_t
__size
)
{
// uint32_t v = rand();
// if (v % 5 <= 1) {
// return NULL;
// } else {
return
malloc
(
__size
);
// }
}
#define malloc u_malloc
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
...
...
@@ -2586,7 +2597,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
int64_t
getNumOfResultWindowRes
(
SQuery
*
pQuery
,
SWindowResult
*
pWindowRes
)
{
// int64_t maxOutput = 0;
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
base
.
functionId
;
...
...
@@ -2604,15 +2614,6 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
if
(
pResultInfo
->
numOfRes
>
0
)
{
return
pResultInfo
->
numOfRes
;
}
// if (pResultInfo != NULL && maxOutput < pResultInfo->numOfRes) {
// maxOutput = pResultInfo->numOfRes;
//
// if (maxOutput > 0) {
// break;
// }
// }
//
// assert(pResultInfo != NULL);
}
return
0
;
...
...
@@ -2623,12 +2624,19 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
size_t
size
=
taosArrayGetSize
(
pGroup
);
tFilePage
**
buffer
=
pQuery
->
sdata
;
int32_t
*
posList
=
calloc
(
size
,
sizeof
(
int32_t
));
int32_t
*
posList
=
calloc
(
size
,
sizeof
(
int32_t
));
STableQueryInfo
**
pTableList
=
malloc
(
POINTER_BYTES
*
size
);
if
(
pTableList
==
NULL
||
posList
==
NULL
)
{
tfree
(
posList
);
tfree
(
pTableList
);
qError
(
"QInfo:%p failed alloc memory"
,
pQInfo
);
longjmp
(
pQInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
// todo opt for the case of one table per group
int32_t
numOfTables
=
0
;
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
...
...
@@ -4069,7 +4077,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
return
pFillCol
;
}
int32_t
doInitQInfo
(
SQInfo
*
pQInfo
,
STSBuf
*
pTsBuf
,
void
*
tsdb
,
int32_t
vgId
,
bool
isSTableQuery
,
void
*
freeParam
,
_qinfo_free_fn_t
fn
)
{
int32_t
doInitQInfo
(
SQInfo
*
pQInfo
,
STSBuf
*
pTsBuf
,
void
*
tsdb
,
int32_t
vgId
,
bool
isSTableQuery
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
...
...
@@ -4083,8 +4091,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pQInfo
->
tsdb
=
tsdb
;
pQInfo
->
vgId
=
vgId
;
pQInfo
->
param
=
freeParam
;
pQInfo
->
fn
=
fn
;
pRuntimeEnv
->
pQuery
=
pQuery
;
pRuntimeEnv
->
pTSBuf
=
pTsBuf
;
...
...
@@ -4932,14 +4938,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time
pRuntimeEnv
->
summary
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
assert
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
);
/* check if query is killed or not */
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%p query is killed"
,
pQInfo
);
}
else
{
qDebug
(
"QInfo:%p query paused, %"
PRId64
" rows returned, numOfTotal:%"
PRId64
" rows"
,
pQInfo
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
);
}
}
static
void
stableQueryImpl
(
SQInfo
*
pQInfo
)
{
...
...
@@ -4961,10 +4959,6 @@ static void stableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time
pQInfo
->
runtimeEnv
.
summary
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
if
(
pQuery
->
rec
.
rows
==
0
)
{
qDebug
(
"QInfo:%p over, %zu tables queried, %"
PRId64
" rows are returned"
,
pQInfo
,
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
,
pQuery
->
rec
.
total
);
}
}
static
int32_t
getColumnIndexInSource
(
SQueryTableMsg
*
pQueryMsg
,
SSqlFuncMsg
*
pExprMsg
,
SColumnInfo
*
pTagCols
)
{
...
...
@@ -5076,6 +5070,8 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
*/
static
int32_t
convertQueryMsg
(
SQueryTableMsg
*
pQueryMsg
,
SArray
**
pTableIdList
,
SSqlFuncMsg
***
pExpr
,
char
**
tagCond
,
char
**
tbnameCond
,
SColIndex
**
groupbyCols
,
SColumnInfo
**
tagCols
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
pQueryMsg
->
numOfTables
=
htonl
(
pQueryMsg
->
numOfTables
);
pQueryMsg
->
window
.
skey
=
htobe64
(
pQueryMsg
->
window
.
skey
);
...
...
@@ -5102,7 +5098,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
// query msg safety check
if
(
!
validateQueryMsg
(
pQueryMsg
))
{
return
TSDB_CODE_QRY_INVALID_MSG
;
code
=
TSDB_CODE_QRY_INVALID_MSG
;
goto
_cleanup
;
}
char
*
pMsg
=
(
char
*
)(
pQueryMsg
->
colList
)
+
sizeof
(
SColumnInfo
)
*
pQueryMsg
->
numOfCols
;
...
...
@@ -5174,7 +5171,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
int16_t
functionId
=
pExprMsg
->
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG
||
functionId
==
TSDB_FUNC_TAGPRJ
||
functionId
==
TSDB_FUNC_TAG_DUMMY
)
{
if
(
pExprMsg
->
colInfo
.
flag
!=
TSDB_COL_TAG
)
{
// ignore the column index check for arithmetic expression.
return
TSDB_CODE_QRY_INVALID_MSG
;
code
=
TSDB_CODE_QRY_INVALID_MSG
;
goto
_cleanup
;
}
}
else
{
// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) {
...
...
@@ -5186,6 +5184,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
if
(
!
validateQuerySourceCols
(
pQueryMsg
,
*
pExpr
))
{
code
=
TSDB_CODE_QRY_INVALID_MSG
;
goto
_cleanup
;
}
...
...
@@ -5193,6 +5192,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if
(
pQueryMsg
->
numOfGroupCols
>
0
)
{
// group by tag columns
*
groupbyCols
=
malloc
(
pQueryMsg
->
numOfGroupCols
*
sizeof
(
SColIndex
));
if
(
*
groupbyCols
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_cleanup
;
}
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfGroupCols
;
++
i
)
{
(
*
groupbyCols
)[
i
].
colId
=
*
(
int16_t
*
)
pMsg
;
...
...
@@ -5248,7 +5251,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if
(
*
pMsg
!=
0
)
{
size_t
len
=
strlen
(
pMsg
)
+
1
;
*
tbnameCond
=
malloc
(
len
);
if
(
*
tbnameCond
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_cleanup
;
}
strcpy
(
*
tbnameCond
,
pMsg
);
pMsg
+=
len
;
}
...
...
@@ -5258,7 +5267,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg
,
pQueryMsg
->
numOfTables
,
pQueryMsg
->
queryType
,
pQueryMsg
->
window
.
skey
,
pQueryMsg
->
window
.
ekey
,
pQueryMsg
->
numOfGroupCols
,
pQueryMsg
->
order
,
pQueryMsg
->
numOfOutput
,
pQueryMsg
->
numOfCols
,
pQueryMsg
->
intervalTime
,
pQueryMsg
->
fillType
,
pQueryMsg
->
tsLen
,
pQueryMsg
->
tsNumOfBlocks
,
pQueryMsg
->
limit
,
pQueryMsg
->
offset
);
return
0
;
return
TSDB_CODE_SUCCESS
;
_cleanup:
tfree
(
*
pExpr
);
...
...
@@ -5268,7 +5278,8 @@ _cleanup:
tfree
(
*
groupbyCols
);
tfree
(
*
tagCols
);
tfree
(
*
tagCond
);
return
TSDB_CODE_QRY_INVALID_MSG
;
return
code
;
}
static
int32_t
buildAirthmeticExprFromMsg
(
SExprInfo
*
pArithExprInfo
,
SQueryTableMsg
*
pQueryMsg
)
{
...
...
@@ -5656,7 +5667,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
STableQueryInfo
*
item
=
createTableQueryInfo
(
&
pQInfo
->
runtimeEnv
,
pTable
,
window
);
item
->
groupIndex
=
i
;
item
->
tableIndex
=
tableIndex
++
;
taosArrayPush
(
p1
,
&
item
);
taosHashPut
(
pQInfo
->
tableqinfoGroupInfo
.
map
,
&
id
.
tid
,
sizeof
(
id
.
tid
),
&
item
,
POINTER_BYTES
);
}
...
...
@@ -5670,7 +5680,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery
->
window
=
pQueryMsg
->
window
;
if
(
sem_init
(
&
pQInfo
->
dataReady
,
0
,
0
)
!=
0
)
{
qError
(
"QInfo:%p init dataReady sem failed, reason:%s"
,
pQInfo
,
strerror
(
errno
));
int32_t
code
=
TAOS_SYSTEM_ERROR
(
errno
);
qError
(
"QInfo:%p init dataReady sem failed, reason:%s"
,
pQInfo
,
tstrerror
(
code
));
goto
_cleanup
;
}
...
...
@@ -5681,7 +5692,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
_cleanup:
freeQInfo
(
pQInfo
);
return
NULL
;
}
...
...
@@ -5723,6 +5733,9 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
return
TSDB_CODE_SUCCESS
;
}
pQInfo
->
param
=
param
;
pQInfo
->
freeFn
=
fn
;
if
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:%p no table qualified for tag filter, abort query"
,
pQInfo
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
...
...
@@ -5732,7 +5745,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
}
// filter the qualified
if
((
code
=
doInitQInfo
(
pQInfo
,
pTSBuf
,
tsdb
,
vgId
,
isSTable
,
param
,
fn
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
doInitQInfo
(
pQInfo
,
pTSBuf
,
tsdb
,
vgId
,
isSTable
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -6032,19 +6045,19 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
qDebug
(
"QInfo:%p dec refCount, value:%d"
,
pQInfo
,
ref
);
if
(
ref
==
0
)
{
_qinfo_free_fn_t
f
n
=
pQInfo
->
f
n
;
_qinfo_free_fn_t
f
reeFp
=
pQInfo
->
freeF
n
;
void
*
param
=
pQInfo
->
param
;
doDestoryQueryInfo
(
pQInfo
);
if
(
f
n
!=
NULL
)
{
if
(
f
reeFp
!=
NULL
)
{
assert
(
param
!=
NULL
);
f
n
(
param
);
f
reeFp
(
param
);
}
}
}
void
qTableQuery
(
qinfo_t
qinfo
,
void
(
*
fp
)(
void
*
),
void
*
param
)
{
void
qTableQuery
(
qinfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
pQInfo
->
signature
!=
pQInfo
)
{
...
...
@@ -6054,17 +6067,34 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%p it is already killed, abort"
,
pQInfo
);
sem_post
(
&
pQInfo
->
dataReady
);
qDestroyQueryInfo
(
pQInfo
);
return
;
}
if
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:%p no table exists for query, abort"
,
pQInfo
);
sem_post
(
&
pQInfo
->
dataReady
);
qDestroyQueryInfo
(
pQInfo
);
return
;
}
int32_t
ret
=
setjmp
(
pQInfo
->
env
);
// error occurs, record the error code and return to client
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pQInfo
->
code
=
ret
;
qDebug
(
"QInfo:%p query abort due to error occurs, code:%s"
,
pQInfo
,
tstrerror
(
pQInfo
->
code
));
sem_post
(
&
pQInfo
->
dataReady
);
qDestroyQueryInfo
(
pQInfo
);
return
;
}
qDebug
(
"QInfo:%p query task is launched"
,
pQInfo
);
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
if
(
onlyQueryTags
(
pQInfo
->
runtimeEnv
.
pQuery
))
{
assert
(
pQInfo
->
runtimeEnv
.
pQueryHandle
==
NULL
);
buildTagQueryResult
(
pQInfo
);
// todo support the limit/offset
...
...
@@ -6074,6 +6104,16 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
tableQueryImpl
(
pQInfo
);
}
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%p query is killed"
,
pQInfo
);
}
else
if
(
pQuery
->
rec
.
rows
==
0
)
{
qDebug
(
"QInfo:%p over, %zu tables queried, %"
PRId64
" rows are returned"
,
pQInfo
,
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
,
pQuery
->
rec
.
total
);
}
else
{
qDebug
(
"QInfo:%p query paused, %"
PRId64
" rows returned, numOfTotal:%"
PRId64
" rows"
,
pQInfo
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
);
}
sem_post
(
&
pQInfo
->
dataReady
);
qDestroyQueryInfo
(
pQInfo
);
}
...
...
src/query/src/qast.c
浏览文件 @
f159fdb3
...
...
@@ -1173,9 +1173,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
size_t
len
=
strlen
(
cond
)
+
VARSTR_HEADER_SIZE
;
char
*
p
=
exception_malloc
(
len
);
varDataSetLen
(
p
,
len
-
VARSTR_HEADER_SIZE
);
memcpy
(
varDataVal
(
p
),
cond
,
len
);
STR_WITH_SIZE_TO_VARSTR
(
p
,
cond
,
len
-
VARSTR_HEADER_SIZE
);
taosArrayPush
(
pVal
->
arr
,
&
p
);
}
...
...
src/query/src/qparserImpl.c
浏览文件 @
f159fdb3
...
...
@@ -15,16 +15,16 @@
#include "os.h"
#include "qsqlparser.h"
#include "queryLog.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tcmdtype.h"
#include "tglobal.h"
#include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h"
#include "ttokendef.h"
#include "tutil.h"
#include "qsqltype.h"
#include "tstrbuild.h"
#include "queryLog.h"
SSqlInfo
qSQLParse
(
const
char
*
pStr
)
{
void
*
pParser
=
ParseAlloc
(
malloc
);
...
...
src/query/src/sql.c
浏览文件 @
f159fdb3
...
...
@@ -25,17 +25,17 @@
#include <stdio.h>
/************ Begin %include sections from the grammar ************************/
#include <assert.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <stdbool.h>
#include "tutil.h"
#include "qsqlparser.h"
#include "tcmdtype.h"
#include "tstoken.h"
#include "tvariant.h"
#include "ttokendef.h"
#include "qsqltype.h"
#include "tutil.h"
#include "tvariant.h"
/**************** End of %include directives **********************************/
/* These constants specify the various numeric values for terminal symbols
** in a format understandable to "makeheaders". This section is blank unless
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
f159fdb3
...
...
@@ -255,17 +255,46 @@ _err:
return
NULL
;
}
static
int32_t
colIdCompar
(
const
void
*
left
,
const
void
*
right
)
{
int16_t
colId
=
*
(
int16_t
*
)
left
;
STColumn
*
p2
=
(
STColumn
*
)
right
;
if
(
colId
==
p2
->
colId
)
{
return
0
;
}
return
(
colId
<
p2
->
colId
)
?
-
1
:
1
;
}
int
tsdbUpdateTagValue
(
TSDB_REPO_T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
int16_t
tversion
=
htons
(
pMsg
->
tversion
);
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
htobe64
(
pMsg
->
uid
));
pMsg
->
uid
=
htobe64
(
pMsg
->
uid
);
pMsg
->
tid
=
htonl
(
pMsg
->
tid
);
pMsg
->
tversion
=
htons
(
pMsg
->
tversion
);
pMsg
->
colId
=
htons
(
pMsg
->
colId
);
pMsg
->
tagValLen
=
htonl
(
pMsg
->
tagValLen
);
pMsg
->
numOfTags
=
htons
(
pMsg
->
numOfTags
);
pMsg
->
schemaLen
=
htonl
(
pMsg
->
schemaLen
);
assert
(
pMsg
->
schemaLen
==
sizeof
(
STColumn
)
*
pMsg
->
numOfTags
);
char
*
d
=
pMsg
->
data
;
for
(
int32_t
i
=
0
;
i
<
pMsg
->
numOfTags
;
++
i
)
{
STColumn
*
pCol
=
(
STColumn
*
)
d
;
pCol
->
colId
=
htons
(
pCol
->
colId
);
pCol
->
bytes
=
htons
(
pCol
->
bytes
);
pCol
->
offset
=
0
;
d
+=
sizeof
(
STColumn
);
}
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
pMsg
->
uid
);
if
(
pTable
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
return
-
1
;
}
if
(
TABLE_TID
(
pTable
)
!=
htonl
(
pMsg
->
tid
)
)
{
if
(
TABLE_TID
(
pTable
)
!=
pMsg
->
tid
)
{
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
return
-
1
;
}
...
...
@@ -277,10 +306,10 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
return
-
1
;
}
if
(
schemaVersion
(
tsdbGetTableTagSchema
(
pTable
))
<
tversion
)
{
if
(
schemaVersion
(
tsdbGetTableTagSchema
(
pTable
))
<
pMsg
->
tversion
)
{
tsdbDebug
(
"vgId:%d server tag version %d is older than client tag version %d, try to config"
,
REPO_ID
(
pRepo
),
schemaVersion
(
tsdbGetTableTagSchema
(
pTable
)),
tversion
);
void
*
msg
=
(
*
pRepo
->
appH
.
configFunc
)(
pRepo
->
config
.
tsdbId
,
htonl
(
pMsg
->
tid
)
);
schemaVersion
(
tsdbGetTableTagSchema
(
pTable
)),
pMsg
->
tversion
);
void
*
msg
=
(
*
pRepo
->
appH
.
configFunc
)(
pRepo
->
config
.
tsdbId
,
pMsg
->
tid
);
if
(
msg
==
NULL
)
return
-
1
;
// Deal with error her
...
...
@@ -299,19 +328,24 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
STSchema
*
pTagSchema
=
tsdbGetTableTagSchema
(
pTable
);
if
(
schemaVersion
(
pTagSchema
)
>
tversion
)
{
if
(
schemaVersion
(
pTagSchema
)
>
pMsg
->
tversion
)
{
tsdbError
(
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
"version %d"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
tversion
,
schemaVersion
(
pTable
->
tagSchema
));
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
pMsg
->
tversion
,
schemaVersion
(
pTable
->
tagSchema
));
return
TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE
;
}
if
(
schemaColAt
(
pTagSchema
,
DEFAULT_TAG_INDEX_COLUMN
)
->
colId
==
htons
(
pMsg
->
colId
)
)
{
if
(
schemaColAt
(
pTagSchema
,
DEFAULT_TAG_INDEX_COLUMN
)
->
colId
==
pMsg
->
colId
)
{
tsdbRemoveTableFromIndex
(
pMeta
,
pTable
);
}
// TODO: remove table from index if it is the first column of tag
tdSetKVRowDataOfCol
(
&
pTable
->
tagVal
,
htons
(
pMsg
->
colId
),
htons
(
pMsg
->
type
),
pMsg
->
data
);
if
(
schemaColAt
(
pTagSchema
,
DEFAULT_TAG_INDEX_COLUMN
)
->
colId
==
htons
(
pMsg
->
colId
))
{
// TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId
STColumn
*
res
=
bsearch
(
&
pMsg
->
colId
,
pMsg
->
data
,
pMsg
->
numOfTags
,
sizeof
(
STColumn
),
colIdCompar
);
assert
(
res
!=
NULL
);
tdSetKVRowDataOfCol
(
&
pTable
->
tagVal
,
pMsg
->
colId
,
res
->
type
,
pMsg
->
data
+
pMsg
->
schemaLen
);
if
(
schemaColAt
(
pTagSchema
,
DEFAULT_TAG_INDEX_COLUMN
)
->
colId
==
pMsg
->
colId
)
{
tsdbAddTableIntoIndex
(
pMeta
,
pTable
);
}
return
TSDB_CODE_SUCCESS
;
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
f159fdb3
...
...
@@ -74,9 +74,6 @@ typedef struct STableCheckInfo {
SDataCols
*
pDataCols
;
int32_t
chosen
;
// indicate which iterator should move forward
bool
initBuf
;
// whether to initialize the in-memory skip list iterator or not
SMemTable
*
mem
;
// in-mem buffer, hold the ref count
SMemTable
*
imem
;
// imem buffer, hold the ref count to avoid release
SSkipListIterator
*
iter
;
// mem buffer skip list iterator
SSkipListIterator
*
iiter
;
// imem buffer skip list iterator
}
STableCheckInfo
;
...
...
@@ -113,6 +110,8 @@ typedef struct STsdbQueryHandle {
SFileGroupIter
fileIter
;
SRWHelper
rhelper
;
STableBlockInfo
*
pDataBlockInfo
;
SMemTable
*
mem
;
// mem-table
SMemTable
*
imem
;
// imem-table, acquired from snapshot
SDataBlockLoadInfo
dataBlockLoadInfo
;
/* record current block load information */
SLoadCompBlockInfo
compBlockLoadInfo
;
/* record current compblock information in SQuery */
...
...
@@ -138,9 +137,6 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
}
TsdbQueryHandleT
*
tsdbQueryTables
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
STsdbQueryHandle
*
pQueryHandle
=
calloc
(
1
,
sizeof
(
STsdbQueryHandle
));
pQueryHandle
->
order
=
pCond
->
order
;
pQueryHandle
->
window
=
pCond
->
twindow
;
...
...
@@ -154,6 +150,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle
->
outputCapacity
=
((
STsdbRepo
*
)
tsdb
)
->
config
.
maxRowsPerFileBlock
;
tsdbInitReadHelper
(
&
pQueryHandle
->
rhelper
,
(
STsdbRepo
*
)
tsdb
);
tsdbTakeMemSnapshot
(
pQueryHandle
->
pTsdb
,
&
pQueryHandle
->
mem
,
&
pQueryHandle
->
imem
);
size_t
sizeOfGroup
=
taosArrayGetSize
(
groupList
->
pGroupList
);
assert
(
sizeOfGroup
>=
1
&&
pCond
!=
NULL
&&
pCond
->
numOfCols
>
0
);
...
...
@@ -252,22 +249,22 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
pCheckInfo
->
initBuf
=
true
;
int32_t
order
=
pHandle
->
order
;
tsdbTakeMemSnapshot
(
pHandle
->
pTsdb
,
&
pCheckInfo
->
mem
,
&
pCheckInfo
->
imem
);
//
tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem);
// no data in buffer, abort
if
(
p
CheckInfo
->
mem
==
NULL
&&
pCheckInfo
->
imem
==
NULL
)
{
if
(
p
Handle
->
mem
==
NULL
&&
pHandle
->
imem
==
NULL
)
{
return
false
;
}
assert
(
pCheckInfo
->
iter
==
NULL
&&
pCheckInfo
->
iiter
==
NULL
);
if
(
p
CheckInfo
->
mem
&&
pCheckInfo
->
mem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
!=
NULL
)
{
pCheckInfo
->
iter
=
tSkipListCreateIterFromVal
(
p
CheckInfo
->
mem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
->
pData
,
if
(
p
Handle
->
mem
&&
pHandle
->
mem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
!=
NULL
)
{
pCheckInfo
->
iter
=
tSkipListCreateIterFromVal
(
p
Handle
->
mem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
->
pData
,
(
const
char
*
)
&
pCheckInfo
->
lastKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
order
);
}
if
(
p
CheckInfo
->
imem
&&
pCheckInfo
->
imem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
!=
NULL
)
{
pCheckInfo
->
iiter
=
tSkipListCreateIterFromVal
(
p
CheckInfo
->
imem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
->
pData
,
if
(
p
Handle
->
imem
&&
pHandle
->
imem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
!=
NULL
)
{
pCheckInfo
->
iiter
=
tSkipListCreateIterFromVal
(
p
Handle
->
imem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
->
pData
,
(
const
char
*
)
&
pCheckInfo
->
lastKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
order
);
}
...
...
@@ -2319,9 +2316,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
STableCheckInfo
*
pTableCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
i
);
tSkipListDestroyIter
(
pTableCheckInfo
->
iter
);
tsdbUnRefMemTable
(
pQueryHandle
->
pTsdb
,
pTableCheckInfo
->
mem
);
tsdbUnRefMemTable
(
pQueryHandle
->
pTsdb
,
pTableCheckInfo
->
imem
);
if
(
pTableCheckInfo
->
pDataCols
!=
NULL
)
{
tfree
(
pTableCheckInfo
->
pDataCols
->
buf
);
}
...
...
@@ -2341,9 +2335,12 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
taosArrayDestroy
(
pQueryHandle
->
pColumns
);
tfree
(
pQueryHandle
->
pDataBlockInfo
);
tfree
(
pQueryHandle
->
statis
);
// todo check error
tsdbUnRefMemTable
(
pQueryHandle
->
pTsdb
,
pQueryHandle
->
mem
);
tsdbUnRefMemTable
(
pQueryHandle
->
pTsdb
,
pQueryHandle
->
imem
);
tsdbDestroyHelper
(
&
pQueryHandle
->
rhelper
);
tfree
(
pQueryHandle
);
}
...
...
src/util/src/tcache.c
浏览文件 @
f159fdb3
...
...
@@ -547,7 +547,6 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
pCacheObj
->
freeFp
(
pElem
->
pData
->
data
);
}
uError
(
"-------------------free obj:%p"
,
pElem
->
pData
);
free
(
pElem
->
pData
);
free
(
pElem
);
}
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
f159fdb3
...
...
@@ -324,7 +324,7 @@ void vnodeRelease(void *pVnodeRaw) {
assert
(
refCount
>=
0
);
if
(
refCount
>
0
)
{
v
Trace
(
"vgId:%d, release vnode, refCount:%d"
,
vgId
,
refCount
);
v
Debug
(
"vgId:%d, release vnode, refCount:%d"
,
vgId
,
refCount
);
return
;
}
...
...
@@ -388,7 +388,7 @@ void *vnodeAccquireVnode(int32_t vgId) {
if
(
pVnode
==
NULL
)
return
pVnode
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
v
Trace
(
"vgId:%d, get vnode, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
v
Debug
(
"vgId:%d, get vnode, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
return
pVnode
;
}
...
...
@@ -466,6 +466,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace
(
"vgId:%d, vnode will cleanup, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
// release local resources only after cutting off outside connections
taosCacheCleanup
(
pVnode
->
qHandlePool
);
vnodeRelease
(
pVnode
);
}
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
f159fdb3
...
...
@@ -78,15 +78,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// this message arrived here by means of the *query* message, so release the vnode is necessary
void
**
qhandle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
(
void
*
)
&
killQueryMsg
->
qhandle
,
sizeof
(
killQueryMsg
->
qhandle
));
if
(
qhandle
==
NULL
||
*
qhandle
==
NULL
)
{
// todo handle invalid qhandle error
if
(
qhandle
==
NULL
||
*
qhandle
==
NULL
)
{
vWarn
(
"QInfo:%p invalid qhandle, no matched query handle, conn:%p"
,
(
void
*
)
killQueryMsg
->
qhandle
,
pReadMsg
->
rpcMsg
.
handle
);
}
else
{
// qKillQuery((qinfo_t) killQueryMsg->qhandle);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
qhandle
,
true
);
}
vnodeRelease
(
pVnode
);
return
TSDB_CODE_TSC_QUERY_CANCELLED
;
// todo change the error code
return
TSDB_CODE_TSC_QUERY_CANCELLED
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -97,8 +96,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code
=
qCreateQueryInfo
(
pVnode
->
tsdb
,
pVnode
->
vgId
,
pQueryTableMsg
,
pVnode
,
vnodeRelease
,
&
pQInfo
);
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
pQInfo
))
;
pRsp
->
code
=
code
;
pRsp
->
code
=
code
;
pRsp
->
qhandle
=
0
;
pRet
->
len
=
sizeof
(
SQueryTableRsp
);
pRet
->
rsp
=
pRsp
;
...
...
@@ -120,6 +119,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
handle
=
taosCachePut
(
pVnode
->
qHandlePool
,
pQInfo
,
sizeof
(
pQInfo
),
&
pQInfo
,
sizeof
(
pQInfo
),
tsShellActivityTimer
*
2
);
assert
(
*
handle
==
pQInfo
);
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
handle
));
}
else
{
assert
(
pQInfo
==
NULL
);
vnodeRelease
(
pVnode
);
...
...
@@ -128,13 +128,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed"
,
pVnode
->
vgId
,
pQInfo
);
}
else
{
assert
(
pCont
!=
NULL
);
pQInfo
=
pCont
;
pQInfo
=
*
(
void
**
)(
pCont
);
handle
=
pCont
;
code
=
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg in progress"
,
pVnode
->
vgId
,
pQInfo
);
}
if
(
pQInfo
!=
NULL
)
{
qTableQuery
(
pQInfo
,
vnodeRelease
,
pVnode
);
// do execute query
qTableQuery
(
pQInfo
);
// do execute query
assert
(
handle
!=
NULL
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
false
);
}
...
...
@@ -146,52 +150,54 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SRspRet
*
pRet
=
&
pReadMsg
->
rspRet
;
SRetrieveTableMsg
*
pRetrieve
=
pCont
;
void
*
pQInfo
=
(
void
*
)
htobe64
(
pRetrieve
->
qhandle
);
void
*
*
pQInfo
=
(
void
*
)
htobe64
(
pRetrieve
->
qhandle
);
pRetrieve
->
free
=
htons
(
pRetrieve
->
free
);
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is disposed"
,
pVnode
->
vgId
,
*
pQInfo
);
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
int32_t
ret
=
0
;
void
**
handle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
&
pQInfo
,
sizeof
(
pQInfo
));
if
(
handle
==
NULL
||
*
handle
!=
pQInfo
)
{
void
**
handle
=
taosCacheAcquireByKey
(
pVnode
->
qHandlePool
,
pQInfo
,
sizeof
(
pQInfo
));
if
(
handle
==
NULL
||
handle
!=
pQInfo
)
{
ret
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
if
(
pRetrieve
->
free
==
1
)
{
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pQInfo
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
handle
,
true
);
// int32_t ret = qKillQuery(pQInfo);
if
(
ret
==
TSDB_CODE_SUCCESS
)
{
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pQInfo
);
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
true
);
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
SRetrieveTableRsp
*
pRsp
=
pRet
->
rsp
;
pRsp
->
numOfRows
=
0
;
pRsp
->
completed
=
true
;
pRsp
->
useconds
=
0
;
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
SRetrieveTableRsp
*
pRsp
=
pRet
->
rsp
;
pRsp
->
numOfRows
=
0
;
pRsp
->
completed
=
true
;
pRsp
->
useconds
=
0
;
}
else
{
// todo handle error
}
return
ret
;
}
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is received"
,
pVnode
->
vgId
,
pQInfo
);
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is received"
,
pVnode
->
vgId
,
*
pQInfo
);
int32_t
code
=
qRetrieveQueryResultInfo
(
pQInfo
);
int32_t
code
=
qRetrieveQueryResultInfo
(
*
pQInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
//TODO
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
}
else
{
// todo check code and handle error in build result set
code
=
qDumpRetrieveResult
(
pQInfo
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
);
code
=
qDumpRetrieveResult
(
*
pQInfo
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
);
if
(
qHasMoreResultsToRetrieve
(
pQInfo
))
{
pRet
->
qhandle
=
pQInfo
;
if
(
qHasMoreResultsToRetrieve
(
*
pQInfo
))
{
pRet
->
qhandle
=
handle
;
code
=
TSDB_CODE_VND_ACTION_NEED_REPROCESSED
;
}
else
{
// no further execution invoked, release the ref to vnode
taosCacheRelease
(
pVnode
->
qHandlePool
,
(
void
**
)
&
handle
,
true
);
// qDestroyQueryInfo(pQInfo);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录