Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4c1c4f56
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4c1c4f56
编写于
7月 01, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/2.0tsdb
上级
1c4c9c95
e9eb13e7
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
166 addition
and
121 deletion
+166
-121
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+22
-26
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+23
-7
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+17
-5
src/common/inc/tdataformat.h
src/common/inc/tdataformat.h
+2
-1
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+16
-0
src/common/src/ttypes.c
src/common/src/ttypes.c
+23
-0
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+5
-22
src/inc/taosdef.h
src/inc/taosdef.h
+1
-0
src/inc/taosmsg.h
src/inc/taosmsg.h
+2
-1
src/plugins/http/src/httpContext.c
src/plugins/http/src/httpContext.c
+7
-2
src/plugins/http/src/httpHandle.c
src/plugins/http/src/httpHandle.c
+12
-12
src/plugins/http/src/httpJson.c
src/plugins/http/src/httpJson.c
+8
-8
src/plugins/http/src/httpServer.c
src/plugins/http/src/httpServer.c
+0
-2
src/plugins/http/src/httpSystem.c
src/plugins/http/src/httpSystem.c
+4
-2
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+7
-19
src/util/src/tcache.c
src/util/src/tcache.c
+17
-14
未找到文件。
src/client/src/tscParseInsert.c
浏览文件 @
4c1c4f56
...
...
@@ -891,11 +891,15 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"keyword TAGS expected"
,
sToken
.
z
);
}
SKVRowBuilder
kvRowBuilder
=
{
0
};
if
(
tdInitKVRowBuilder
(
&
kvRowBuilder
)
<
0
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
uint32_t
ignoreTokenTypes
=
TK_LP
;
uint32_t
numOfIgnoreToken
=
1
;
for
(
int
i
=
0
;
i
<
spd
.
numOfAssignedCols
;
++
i
)
{
char
*
tagVal
=
pTag
->
data
+
spd
.
elems
[
i
].
offset
;
int16_t
colIndex
=
spd
.
elems
[
i
].
colIndex
;
SSchema
*
pSchema
=
pTagSchema
+
spd
.
elems
[
i
].
colIndex
;
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
true
,
numOfIgnoreToken
,
&
ignoreTokenTypes
);
...
...
@@ -911,12 +915,26 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sToken
.
n
-=
2
;
}
code
=
tsParseOneColumnData
(
&
pTagSchema
[
colIndex
],
&
sToken
,
tagVal
,
pCmd
->
payload
,
&
sql
,
false
,
tinfo
.
precision
);
char
tagVal
[
TSDB_MAX_TAGS_LEN
];
code
=
tsParseOneColumnData
(
pSchema
,
&
sToken
,
tagVal
,
pCmd
->
payload
,
&
sql
,
false
,
tinfo
.
precision
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
return
code
;
}
tdAddColToKVRow
(
&
kvRowBuilder
,
pSchema
->
colId
,
pSchema
->
type
,
tagVal
);
}
SKVRow
row
=
tdGetKVRowFromBuilder
(
&
kvRowBuilder
);
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
if
(
row
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
tdSortKVRowByColIdx
(
row
);
pTag
->
dataLen
=
kvRowLen
(
row
);
kvRowCpy
(
pTag
->
data
,
row
);
free
(
row
);
index
=
0
;
sToken
=
tStrGetToken
(
sql
,
&
index
,
false
,
0
,
NULL
);
sql
+=
index
;
...
...
@@ -924,29 +942,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
") expected"
,
sToken
.
z
);
}
// 2. set the null value for the columns that do not assign values
if
(
spd
.
numOfAssignedCols
<
spd
.
numOfCols
)
{
char
*
ptr
=
pTag
->
data
;
for
(
int32_t
i
=
0
;
i
<
spd
.
numOfCols
;
++
i
)
{
if
(
!
spd
.
hasVal
[
i
])
{
// current tag column do not have any value to insert, set it to null
if
(
pTagSchema
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
||
pTagSchema
[
i
].
type
==
TSDB_DATA_TYPE_NCHAR
)
{
setVardataNull
(
ptr
,
pTagSchema
[
i
].
type
);
}
else
{
setNull
(
ptr
,
pTagSchema
[
i
].
type
,
pTagSchema
[
i
].
bytes
);
}
}
ptr
+=
pTagSchema
[
i
].
bytes
;
}
}
// 3. calculate the actual data size of STagData
pCmd
->
payloadLen
=
sizeof
(
pTag
->
name
)
+
sizeof
(
pTag
->
dataLen
);
for
(
int32_t
t
=
0
;
t
<
numOfTags
;
++
t
)
{
pTag
->
dataLen
+=
pTagSchema
[
t
].
bytes
;
pCmd
->
payloadLen
+=
pTagSchema
[
t
].
bytes
;
}
pCmd
->
payloadLen
=
sizeof
(
pTag
->
name
)
+
sizeof
(
pTag
->
dataLen
)
+
pTag
->
dataLen
;
pTag
->
dataLen
=
htonl
(
pTag
->
dataLen
);
if
(
tscValidateName
(
&
tableToken
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
src/client/src/tscSQLParser.c
浏览文件 @
4c1c4f56
...
...
@@ -5623,24 +5623,41 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
SSchema
*
pTagSchema
=
tscGetTableTagSchema
(
pStableMeterMetaInfo
->
pTableMeta
);
STagData
*
pTag
=
&
pCreateTable
->
usingInfo
.
tagdata
;
char
*
tagVal
=
pTag
->
data
;
SKVRowBuilder
kvRowBuilder
=
{
0
};
if
(
tdInitKVRowBuilder
(
&
kvRowBuilder
)
<
0
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
int32_t
ret
=
TSDB_CODE_SUCCESS
;
for
(
int32_t
i
=
0
;
i
<
pList
->
nExpr
;
++
i
)
{
if
(
pTagSchema
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
||
pTagSchema
[
i
].
type
==
TSDB_DATA_TYPE_NCHAR
)
{
SSchema
*
pSchema
=
pTagSchema
+
i
;
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_BINARY
||
pSchema
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
// validate the length of binary
if
(
pList
->
a
[
i
].
pVar
.
nLen
+
VARSTR_HEADER_SIZE
>
pTagSchema
[
i
].
bytes
)
{
if
(
pList
->
a
[
i
].
pVar
.
nLen
+
VARSTR_HEADER_SIZE
>
pSchema
->
bytes
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg3
);
}
}
ret
=
tVariantDump
(
&
(
pList
->
a
[
i
].
pVar
),
tagVal
,
pTagSchema
[
i
].
type
,
true
);
char
tagVal
[
TSDB_MAX_TAGS_LEN
];
ret
=
tVariantDump
(
&
(
pList
->
a
[
i
].
pVar
),
tagVal
,
pSchema
->
type
,
true
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg4
);
}
tagVal
+=
pTagSchema
[
i
].
bytes
;
tdAddColToKVRow
(
&
kvRowBuilder
,
pSchema
->
colId
,
pSchema
->
type
,
tagVal
);
}
SKVRow
row
=
tdGetKVRowFromBuilder
(
&
kvRowBuilder
);
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
if
(
row
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
tdSortKVRowByColIdx
(
row
);
pTag
->
dataLen
=
kvRowLen
(
row
);
kvRowCpy
(
pTag
->
data
,
row
);
free
(
row
);
// table name
if
(
tscValidateName
(
&
pInfo
->
pCreateTableInfo
->
name
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -5653,7 +5670,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return
ret
;
}
pTag
->
dataLen
=
tagVal
-
pTag
->
data
;
return
TSDB_CODE_SUCCESS
;
}
...
...
src/client/src/tscUtil.c
浏览文件 @
4c1c4f56
...
...
@@ -625,18 +625,31 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
return
len
;
}
static
int32_t
getRowExpandSize
(
STableMeta
*
pTableMeta
)
{
int32_t
result
=
TD_DATA_ROW_HEAD_SIZE
;
int32_t
columns
=
tscGetNumOfColumns
(
pTableMeta
);
SSchema
*
pSchema
=
tscGetTableSchema
(
pTableMeta
);
for
(
int32_t
i
=
0
;
i
<
columns
;
i
++
)
{
if
(
IS_VAR_DATA_TYPE
((
pSchema
+
i
)
->
type
))
{
result
+=
TYPE_BYTES
[
TSDB_DATA_TYPE_BINARY
];
}
}
return
result
;
}
int32_t
tscMergeTableDataBlocks
(
SSqlObj
*
pSql
,
SArray
*
pTableDataBlockList
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
const
int32_t
MAX_EXPAND_SIZE
=
TD_DATA_ROW_HEAD_SIZE
+
TYPE_BYTES
[
TSDB_DATA_TYPE_BINARY
];
STableDataBlocks
*
pOneTableBlock
=
taosArrayGetP
(
pTableDataBlockList
,
0
);
int32_t
expandSize
=
getRowExpandSize
(
pOneTableBlock
->
pTableMeta
);
void
*
pVnodeDataBlockHashList
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
SArray
*
pVnodeDataBlockList
=
taosArrayInit
(
8
,
POINTER_BYTES
);
size_t
total
=
taosArrayGetSize
(
pTableDataBlockList
);
for
(
int32_t
i
=
0
;
i
<
total
;
++
i
)
{
STableDataBlocks
*
pOneTableBlock
=
taosArrayGetP
(
pTableDataBlockList
,
i
);
pOneTableBlock
=
taosArrayGetP
(
pTableDataBlockList
,
i
);
STableDataBlocks
*
dataBuf
=
NULL
;
int32_t
ret
=
...
...
@@ -650,7 +663,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
}
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
pOneTableBlock
->
pData
;
int64_t
destSize
=
dataBuf
->
size
+
pOneTableBlock
->
size
+
pBlocks
->
numOfRows
*
MAX_EXPAND_SIZE
;
int64_t
destSize
=
dataBuf
->
size
+
pOneTableBlock
->
size
+
pBlocks
->
numOfRows
*
expandSize
;
if
(
dataBuf
->
nAllocSize
<
destSize
)
{
while
(
dataBuf
->
nAllocSize
<
destSize
)
{
...
...
@@ -678,8 +691,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
tscDebug
(
"%p tableId:%s, sid:%d rows:%d sversion:%d skey:%"
PRId64
", ekey:%"
PRId64
,
pSql
,
pOneTableBlock
->
tableId
,
pBlocks
->
tid
,
pBlocks
->
numOfRows
,
pBlocks
->
sversion
,
GET_INT64_VAL
(
pBlocks
->
data
),
GET_INT64_VAL
(
ekey
));
int32_t
len
=
pBlocks
->
numOfRows
*
(
pOneTableBlock
->
rowSize
+
MAX_EXPAND_SIZE
);
int32_t
len
=
pBlocks
->
numOfRows
*
(
pOneTableBlock
->
rowSize
+
expandSize
);
pBlocks
->
tid
=
htonl
(
pBlocks
->
tid
);
pBlocks
->
uid
=
htobe64
(
pBlocks
->
uid
);
...
...
src/common/inc/tdataformat.h
浏览文件 @
4c1c4f56
...
...
@@ -272,7 +272,7 @@ typedef struct {
int16_t
offset
;
}
SColIdx
;
#define TD_KV_ROW_HEAD_SIZE
2 * sizeof(int16_t
)
#define TD_KV_ROW_HEAD_SIZE
(2 * sizeof(int16_t)
)
#define kvRowLen(r) (*(int16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
...
...
@@ -290,6 +290,7 @@ SKVRow tdKVRowDup(SKVRow row);
int
tdSetKVRowDataOfCol
(
SKVRow
*
orow
,
int16_t
colId
,
int8_t
type
,
void
*
value
);
int
tdEncodeKVRow
(
void
**
buf
,
SKVRow
row
);
void
*
tdDecodeKVRow
(
void
*
buf
,
SKVRow
*
row
);
void
tdSortKVRowByColIdx
(
SKVRow
row
);
static
FORCE_INLINE
int
comparTagId
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(
*
(
int16_t
*
)
key1
>
((
SColIdx
*
)
key2
)
->
colId
)
{
...
...
src/common/src/tdataformat.c
浏览文件 @
4c1c4f56
...
...
@@ -515,6 +515,22 @@ SKVRow tdKVRowDup(SKVRow row) {
return
trow
;
}
static
int
compareColIdx
(
const
void
*
a
,
const
void
*
b
)
{
const
SColIdx
*
x
=
(
const
SColIdx
*
)
a
;
const
SColIdx
*
y
=
(
const
SColIdx
*
)
b
;
if
(
x
->
colId
>
y
->
colId
)
{
return
1
;
}
if
(
x
->
colId
<
y
->
colId
)
{
return
-
1
;
}
return
0
;
}
void
tdSortKVRowByColIdx
(
SKVRow
row
)
{
qsort
(
kvRowColIdx
(
row
),
kvRowNCols
(
row
),
sizeof
(
SColIdx
),
compareColIdx
);
}
int
tdSetKVRowDataOfCol
(
SKVRow
*
orow
,
int16_t
colId
,
int8_t
type
,
void
*
value
)
{
SColIdx
*
pColIdx
=
NULL
;
SKVRow
row
=
*
orow
;
...
...
src/common/src/ttypes.c
浏览文件 @
4c1c4f56
...
...
@@ -464,6 +464,29 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
}
}
static
uint8_t
nullBool
=
TSDB_DATA_BOOL_NULL
;
static
uint8_t
nullTinyInt
=
TSDB_DATA_TINYINT_NULL
;
static
uint16_t
nullSmallInt
=
TSDB_DATA_SMALLINT_NULL
;
static
uint32_t
nullInt
=
TSDB_DATA_INT_NULL
;
static
uint64_t
nullBigInt
=
TSDB_DATA_BIGINT_NULL
;
static
uint32_t
nullFloat
=
TSDB_DATA_FLOAT_NULL
;
static
uint64_t
nullDouble
=
TSDB_DATA_DOUBLE_NULL
;
static
union
{
tstr
str
;
char
pad
[
sizeof
(
tstr
)
+
4
];
}
nullBinary
=
{.
str
=
{.
len
=
1
}},
nullNchar
=
{.
str
=
{.
len
=
4
}};
static
void
*
nullValues
[]
=
{
&
nullBool
,
&
nullTinyInt
,
&
nullSmallInt
,
&
nullInt
,
&
nullBigInt
,
&
nullFloat
,
&
nullDouble
,
&
nullBinary
,
&
nullBigInt
,
&
nullNchar
,
};
void
*
getNullValue
(
int32_t
type
)
{
assert
(
type
>=
TSDB_DATA_TYPE_BOOL
&&
type
<=
TSDB_DATA_TYPE_NCHAR
);
return
nullValues
[
type
-
1
];
}
void
assignVal
(
char
*
val
,
const
char
*
src
,
int32_t
len
,
int32_t
type
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_INT
:
{
...
...
src/cq/src/cqMain.c
浏览文件 @
4c1c4f56
...
...
@@ -256,30 +256,13 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SDataRow
trow
=
(
SDataRow
)
pBlk
->
data
;
tdInitDataRow
(
trow
,
pSchema
);
union
{
char
buf
[
sizeof
(
int64_t
)];
tstr
str
;
}
nullVal
;
for
(
int32_t
i
=
0
;
i
<
pSchema
->
numOfCols
;
i
++
)
{
STColumn
*
c
=
pSchema
->
columns
+
i
;
char
*
val
=
(
char
*
)
row
[
i
];
if
(
IS_VAR_DATA_TYPE
(
c
->
type
))
{
if
(
val
==
NULL
)
{
val
=
nullVal
.
buf
;
if
(
c
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
setNull
(
nullVal
.
str
.
data
,
TSDB_DATA_TYPE_BINARY
,
1
);
nullVal
.
str
.
len
=
1
;
}
else
{
setNull
(
nullVal
.
str
.
data
,
TSDB_DATA_TYPE_NCHAR
,
4
);
nullVal
.
str
.
len
=
4
;
}
}
else
{
val
-=
sizeof
(
VarDataLenT
);
}
}
else
if
(
val
==
NULL
)
{
val
=
nullVal
.
buf
;
setNull
(
val
,
c
->
type
,
c
->
bytes
);
void
*
val
=
row
[
i
];
if
(
val
==
NULL
)
{
val
=
getNullValue
(
c
->
type
);
}
else
if
(
IS_VAR_DATA_TYPE
(
c
->
type
))
{
val
=
((
char
*
)
val
)
-
sizeof
(
VarDataLenT
);
}
tdAppendColVal
(
trow
,
val
,
c
->
type
,
c
->
bytes
,
c
->
offset
);
}
...
...
src/inc/taosdef.h
浏览文件 @
4c1c4f56
...
...
@@ -165,6 +165,7 @@ bool isNull(const char *val, int32_t type);
void
setVardataNull
(
char
*
val
,
int32_t
type
);
void
setNull
(
char
*
val
,
int32_t
type
,
int32_t
bytes
);
void
setNullN
(
char
*
val
,
int32_t
type
,
int32_t
bytes
,
int32_t
numOfElems
);
void
*
getNullValue
(
int32_t
type
);
void
assignVal
(
char
*
val
,
const
char
*
src
,
int32_t
len
,
int32_t
type
);
void
tsDataSwap
(
void
*
pLeft
,
void
*
pRight
,
int32_t
type
,
int32_t
size
);
...
...
src/inc/taosmsg.h
浏览文件 @
4c1c4f56
...
...
@@ -26,6 +26,7 @@ extern "C" {
#include "taosdef.h"
#include "taoserror.h"
#include "trpc.h"
#include "tdataformat.h"
// message type
...
...
@@ -674,7 +675,7 @@ typedef struct SMultiTableMeta {
typedef
struct
{
int32_t
dataLen
;
char
name
[
TSDB_TABLE_ID_LEN
];
char
data
[
TSDB_MAX_TAGS_LEN
];
char
data
[
TSDB_MAX_TAGS_LEN
+
TD_KV_ROW_HEAD_SIZE
+
sizeof
(
SColIdx
)
*
TSDB_MAX_TAGS
];
}
STagData
;
/*
...
...
src/plugins/http/src/httpContext.c
浏览文件 @
4c1c4f56
...
...
@@ -141,10 +141,15 @@ HttpContext *httpGetContext(void *ptr) {
void
httpReleaseContext
(
HttpContext
*
pContext
)
{
int32_t
refCount
=
atomic_sub_fetch_32
(
&
pContext
->
refCount
,
1
);
assert
(
refCount
>=
0
);
httpDebug
(
"context:%p,
fd:%d, is releasd, refCount:%d"
,
pContext
,
pContext
->
fd
,
refCount
);
httpDebug
(
"context:%p,
is releasd, refCount:%d"
,
pContext
,
refCount
);
HttpContext
**
ppContext
=
pContext
->
ppContext
;
taosCacheRelease
(
tsHttpServer
.
contextCache
,
(
void
**
)(
&
ppContext
),
false
);
if
(
tsHttpServer
.
contextCache
!=
NULL
)
{
taosCacheRelease
(
tsHttpServer
.
contextCache
,
(
void
**
)(
&
ppContext
),
false
);
}
else
{
httpDebug
(
"context:%p, won't be destroyed for cache is already released"
,
pContext
);
// httpDestroyContext((void **)(&ppContext));
}
}
bool
httpInitContext
(
HttpContext
*
pContext
)
{
...
...
src/plugins/http/src/httpHandle.c
浏览文件 @
4c1c4f56
...
...
@@ -157,7 +157,7 @@ bool httpGetHttpMethod(HttpContext* pContext) {
pParser
->
method
.
pos
[
pParser
->
method
.
len
]
=
0
;
pParser
->
pLast
=
pSeek
+
1
;
http
Debug
(
"context:%p, fd:%d, ip:%s, httpMethod:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pParser
->
method
.
pos
);
http
Trace
(
"context:%p, fd:%d, ip:%s, httpMethod:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pParser
->
method
.
pos
);
return
true
;
}
...
...
@@ -186,23 +186,23 @@ bool httpParseHead(HttpContext* pContext) {
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
strncasecmp
(
pParser
->
pLast
,
"Content-Length: "
,
16
)
==
0
)
{
pParser
->
data
.
len
=
(
int32_t
)
atoi
(
pParser
->
pLast
+
16
);
http
Debug
(
"context:%p, fd:%d, ip:%s, Content-Length:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
http
Trace
(
"context:%p, fd:%d, ip:%s, Content-Length:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pParser
->
data
.
len
);
}
else
if
(
strncasecmp
(
pParser
->
pLast
,
"Accept-Encoding: "
,
17
)
==
0
)
{
if
(
tsHttpEnableCompress
&&
strstr
(
pParser
->
pLast
+
17
,
"gzip"
)
!=
NULL
)
{
pContext
->
acceptEncoding
=
HTTP_COMPRESS_GZIP
;
http
Debug
(
"context:%p, fd:%d, ip:%s, Accept-Encoding:gzip"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
http
Trace
(
"context:%p, fd:%d, ip:%s, Accept-Encoding:gzip"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
}
else
{
pContext
->
acceptEncoding
=
HTTP_COMPRESS_IDENTITY
;
http
Debug
(
"context:%p, fd:%d, ip:%s, Accept-Encoding:identity"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
http
Trace
(
"context:%p, fd:%d, ip:%s, Accept-Encoding:identity"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
}
}
else
if
(
strncasecmp
(
pParser
->
pLast
,
"Content-Encoding: "
,
18
)
==
0
)
{
if
(
strstr
(
pParser
->
pLast
+
18
,
"gzip"
)
!=
NULL
)
{
pContext
->
contentEncoding
=
HTTP_COMPRESS_GZIP
;
http
Debug
(
"context:%p, fd:%d, ip:%s, Content-Encoding:gzip"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
http
Trace
(
"context:%p, fd:%d, ip:%s, Content-Encoding:gzip"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
}
else
{
pContext
->
contentEncoding
=
HTTP_COMPRESS_IDENTITY
;
http
Debug
(
"context:%p, fd:%d, ip:%s, Content-Encoding:identity"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
http
Trace
(
"context:%p, fd:%d, ip:%s, Content-Encoding:identity"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
}
}
else
if
(
strncasecmp
(
pParser
->
pLast
,
"Connection: "
,
12
)
==
0
)
{
if
(
strncasecmp
(
pParser
->
pLast
+
12
,
"Keep-Alive"
,
10
)
==
0
)
{
...
...
@@ -210,7 +210,7 @@ bool httpParseHead(HttpContext* pContext) {
}
else
{
pContext
->
httpKeepAlive
=
HTTP_KEEPALIVE_DISABLE
;
}
http
Debug
(
"context:%p, fd:%d, ip:%s, keepAlive:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
http
Trace
(
"context:%p, fd:%d, ip:%s, keepAlive:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
httpKeepAlive
);
}
else
if
(
strncasecmp
(
pParser
->
pLast
,
"Transfer-Encoding: "
,
19
)
==
0
)
{
if
(
strncasecmp
(
pParser
->
pLast
+
19
,
"chunked"
,
7
)
==
0
)
{
...
...
@@ -281,7 +281,7 @@ bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) {
httpParseChunkedBody
(
pContext
,
pParser
,
false
);
return
HTTP_CHECK_BODY_SUCCESS
;
}
else
{
http
Debug
(
"context:%p, fd:%d, ip:%s, chunked body not finished, continue read"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
http
Trace
(
"context:%p, fd:%d, ip:%s, chunked body not finished, continue read"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
if
(
!
httpReadDataImp
(
pContext
))
{
httpError
(
"context:%p, fd:%d, ip:%s, read chunked request error"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
return
HTTP_CHECK_BODY_ERROR
;
...
...
@@ -299,7 +299,7 @@ int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) {
httpSendErrorResp
(
pContext
,
HTTP_PARSE_BODY_ERROR
);
return
HTTP_CHECK_BODY_ERROR
;
}
else
if
(
dataReadLen
<
pParser
->
data
.
len
)
{
http
Debug
(
"context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read"
,
http
Trace
(
"context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
parser
.
bufsize
,
dataReadLen
,
pParser
->
data
.
len
);
return
HTTP_CHECK_BODY_CONTINUE
;
}
else
{
...
...
@@ -313,9 +313,9 @@ bool httpParseRequest(HttpContext* pContext) {
return
true
;
}
http
Debug
(
"context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
pContext
->
pThread
->
numOfFds
,
pContext
->
parser
.
bufsize
,
pContext
->
parser
.
buffer
);
http
TraceDump
(
"context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
pContext
->
pThread
->
numOfFds
,
pContext
->
parser
.
bufsize
,
pContext
->
parser
.
buffer
);
if
(
!
httpGetHttpMethod
(
pContext
))
{
return
false
;
...
...
src/plugins/http/src/httpJson.c
浏览文件 @
4c1c4f56
...
...
@@ -76,8 +76,8 @@ int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) {
httpError
(
"context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
sz
,
writeSz
,
buf
);
}
else
{
http
Debug
(
"context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
sz
,
writeSz
,
buf
);
http
Trace
(
"context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
sz
,
writeSz
,
buf
);
}
return
writeSz
;
...
...
@@ -99,7 +99,7 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
uint64_t
srcLen
=
(
uint64_t
)
(
buf
->
lst
-
buf
->
buf
);
if
(
buf
->
pContext
->
fd
<=
0
)
{
http
Debug
(
"context:%p, fd:%d, ip:%s, write json body error"
,
buf
->
pContext
,
buf
->
pContext
->
fd
,
buf
->
pContext
->
ipstr
);
http
Trace
(
"context:%p, fd:%d, ip:%s, write json body error"
,
buf
->
pContext
,
buf
->
pContext
->
fd
,
buf
->
pContext
->
ipstr
);
buf
->
pContext
->
fd
=
-
1
;
}
...
...
@@ -113,11 +113,11 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
if
(
buf
->
pContext
->
acceptEncoding
==
HTTP_COMPRESS_IDENTITY
)
{
if
(
buf
->
lst
==
buf
->
buf
)
{
http
Debug
(
"context:%p, fd:%d, ip:%s, no data need dump"
,
buf
->
pContext
,
buf
->
pContext
->
fd
,
buf
->
pContext
->
ipstr
);
http
Trace
(
"context:%p, fd:%d, ip:%s, no data need dump"
,
buf
->
pContext
,
buf
->
pContext
->
fd
,
buf
->
pContext
->
ipstr
);
return
0
;
// there is no data to dump.
}
else
{
int
len
=
sprintf
(
sLen
,
"%lx
\r\n
"
,
srcLen
);
http
Debug
(
"context:%p, fd:%d, ip:%s, write body, chunkSize:%"
PRIu64
", response:
\n
%s"
,
http
Trace
(
"context:%p, fd:%d, ip:%s, write body, chunkSize:%"
PRIu64
", response:
\n
%s"
,
buf
->
pContext
,
buf
->
pContext
->
fd
,
buf
->
pContext
->
ipstr
,
srcLen
,
buf
->
buf
);
httpWriteBufNoTrace
(
buf
->
pContext
,
sLen
,
len
);
remain
=
httpWriteBufNoTrace
(
buf
->
pContext
,
buf
->
buf
,
(
int
)
srcLen
);
...
...
@@ -129,12 +129,12 @@ int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
if
(
ret
==
0
)
{
if
(
compressBufLen
>
0
)
{
int
len
=
sprintf
(
sLen
,
"%x
\r\n
"
,
compressBufLen
);
http
Debug
(
"context:%p, fd:%d, ip:%s, write body, chunkSize:%"
PRIu64
", compressSize:%d, last:%d, response:
\n
%s"
,
http
Trace
(
"context:%p, fd:%d, ip:%s, write body, chunkSize:%"
PRIu64
", compressSize:%d, last:%d, response:
\n
%s"
,
buf
->
pContext
,
buf
->
pContext
->
fd
,
buf
->
pContext
->
ipstr
,
srcLen
,
compressBufLen
,
isTheLast
,
buf
->
buf
);
httpWriteBufNoTrace
(
buf
->
pContext
,
sLen
,
len
);
remain
=
httpWriteBufNoTrace
(
buf
->
pContext
,
(
const
char
*
)
compressBuf
,
(
int
)
compressBufLen
);
}
else
{
http
Debug
(
"context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:
\n
%s"
,
http
Trace
(
"context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:
\n
%s"
,
buf
->
pContext
,
buf
->
pContext
->
fd
,
buf
->
pContext
->
ipstr
,
isTheLast
,
buf
->
buf
);
return
0
;
// there is no data to dump.
}
...
...
@@ -173,7 +173,7 @@ void httpWriteJsonBufHead(JsonBuf* buf) {
void
httpWriteJsonBufEnd
(
JsonBuf
*
buf
)
{
if
(
buf
->
pContext
->
fd
<=
0
)
{
http
Debug
(
"context:%p, fd:%d, ip:%s, json buf fd is 0"
,
buf
->
pContext
,
buf
->
pContext
->
fd
,
buf
->
pContext
->
ipstr
);
http
Trace
(
"context:%p, fd:%d, ip:%s, json buf fd is 0"
,
buf
->
pContext
,
buf
->
pContext
->
fd
,
buf
->
pContext
->
ipstr
);
buf
->
pContext
->
fd
=
-
1
;
}
...
...
src/plugins/http/src/httpServer.c
浏览文件 @
4c1c4f56
...
...
@@ -66,8 +66,6 @@ void httpCleanUpConnect() {
}
}
tfree
(
pServer
->
pThreads
);
pServer
->
pThreads
=
NULL
;
httpDebug
(
"http server:%s is cleaned up"
,
pServer
->
label
);
}
...
...
src/plugins/http/src/httpSystem.c
浏览文件 @
4c1c4f56
...
...
@@ -95,11 +95,13 @@ void httpCleanUpSystem() {
httpInfo
(
"http server cleanup"
);
httpStopSystem
();
httpCleanUpConnect
();
httpCleanupContexts
();
httpCleanUpSessions
();
httpCleanUpConnect
();
pthread_mutex_destroy
(
&
tsHttpServer
.
serverMutex
);
tfree
(
tsHttpServer
.
pThreads
);
tsHttpServer
.
pThreads
=
NULL
;
tsHttpServer
.
status
=
HTTP_SERVER_CLOSED
;
}
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
4c1c4f56
...
...
@@ -233,26 +233,10 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
if
(
tsdbTableSetSName
(
pCfg
,
pMsg
->
superTableId
,
true
)
<
0
)
goto
_err
;
if
(
tsdbTableSetSuperUid
(
pCfg
,
htobe64
(
pMsg
->
superTableUid
))
<
0
)
goto
_err
;
// Decode tag values
if
(
pMsg
->
tagDataLen
)
{
int
accBytes
=
0
;
int32_t
tagDataLen
=
htonl
(
pMsg
->
tagDataLen
);
if
(
tagDataLen
)
{
char
*
pTagData
=
pMsg
->
data
+
(
numOfCols
+
numOfTags
)
*
sizeof
(
SSchema
);
SKVRowBuilder
kvRowBuilder
=
{
0
};
if
(
tdInitKVRowBuilder
(
&
kvRowBuilder
)
<
0
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
for
(
int
i
=
numOfCols
;
i
<
numOfCols
+
numOfTags
;
i
++
)
{
if
(
tdAddColToKVRow
(
&
kvRowBuilder
,
htons
(
pSchema
[
i
].
colId
),
pSchema
[
i
].
type
,
pTagData
+
accBytes
)
<
0
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
accBytes
+=
htons
(
pSchema
[
i
].
bytes
);
}
tsdbTableSetTagValue
(
pCfg
,
tdGetKVRowFromBuilder
(
&
kvRowBuilder
),
false
);
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tsdbTableSetTagValue
(
pCfg
,
pTagData
,
true
);
}
}
...
...
@@ -620,6 +604,10 @@ static char *getTagIndexKey(const void *pData) {
STSchema
*
pSchema
=
tsdbGetTableTagSchema
(
pTable
);
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
DEFAULT_TAG_INDEX_COLUMN
);
void
*
res
=
tdGetKVRowValOfCol
(
pTable
->
tagVal
,
pCol
->
colId
);
if
(
res
==
NULL
)
{
// treat the column as NULL if we cannot find it
res
=
getNullValue
(
pCol
->
type
);
}
return
res
;
}
...
...
src/util/src/tcache.c
浏览文件 @
4c1c4f56
...
...
@@ -119,7 +119,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t
size
=
pNode
->
size
;
taosHashRemove
(
pCacheObj
->
pHashTable
,
pNode
->
key
,
pNode
->
keySize
);
uDebug
(
"key:%s
is removed from cache,total:%"
PRId64
",size:%d
bytes"
,
pNode
->
key
,
pCacheObj
->
totalSize
,
size
);
uDebug
(
"key:%s
, is removed from cache, total:%"
PRId64
" size:%d
bytes"
,
pNode
->
key
,
pCacheObj
->
totalSize
,
size
);
if
(
pCacheObj
->
freeFp
)
pCacheObj
->
freeFp
(
pNode
->
data
);
free
(
pNode
);
}
...
...
@@ -288,14 +288,14 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz
if
(
NULL
!=
pNode
)
{
pCacheObj
->
totalSize
+=
pNode
->
size
;
uDebug
(
"key:%s %p added into cache, added:%"
PRIu64
", expire:%"
PRIu64
", total:%"
PRId64
", size:%"
PRId64
" bytes"
,
uDebug
(
"key:%s
,
%p added into cache, added:%"
PRIu64
", expire:%"
PRIu64
", total:%"
PRId64
", size:%"
PRId64
" bytes"
,
key
,
pNode
,
pNode
->
addedTime
,
pNode
->
expiredTime
,
pCacheObj
->
totalSize
,
dataSize
);
}
else
{
uError
(
"key:%s failed to added into cache, out of memory"
,
key
);
uError
(
"key:%s
,
failed to added into cache, out of memory"
,
key
);
}
}
else
{
// old data exists, update the node
pNode
=
taosUpdateCacheImpl
(
pCacheObj
,
pOld
,
key
,
keyLen
,
pData
,
dataSize
,
duration
*
1000L
);
uDebug
(
"key:%s %p exist in cache, updated"
,
key
,
pNode
);
uDebug
(
"key:%s
,
%p exist in cache, updated"
,
key
,
pNode
);
}
__cache_unlock
(
pCacheObj
);
...
...
@@ -321,10 +321,10 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) {
if
(
ptNode
!=
NULL
)
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
hitCount
,
1
);
uDebug
(
"key:%s is retrieved from cache, %p refcnt:%d"
,
key
,
(
*
ptNode
),
T_REF_VAL_GET
(
*
ptNode
));
uDebug
(
"key:%s
,
is retrieved from cache, %p refcnt:%d"
,
key
,
(
*
ptNode
),
T_REF_VAL_GET
(
*
ptNode
));
}
else
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
missCount
,
1
);
uDebug
(
"key:%s not in cache, retrieved failed"
,
key
);
uDebug
(
"key:%s
,
not in cache, retrieved failed"
,
key
);
}
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
totalAccess
,
1
);
...
...
@@ -350,10 +350,10 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uin
if
(
ptNode
!=
NULL
)
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
hitCount
,
1
);
uDebug
(
"key:%s expireTime is updated in cache, %p refcnt:%d"
,
key
,
(
*
ptNode
),
T_REF_VAL_GET
(
*
ptNode
));
uDebug
(
"key:%s
,
expireTime is updated in cache, %p refcnt:%d"
,
key
,
(
*
ptNode
),
T_REF_VAL_GET
(
*
ptNode
));
}
else
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
missCount
,
1
);
uDebug
(
"key:%s not in cache, retrieved failed"
,
key
);
uDebug
(
"key:%s
,
not in cache, retrieved failed"
,
key
);
}
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
totalAccess
,
1
);
...
...
@@ -410,13 +410,13 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
SCacheDataNode
*
pNode
=
(
SCacheDataNode
*
)((
char
*
)(
*
data
)
-
offset
);
if
(
pNode
->
signature
!=
(
uint64_t
)
pNode
)
{
uError
(
"
key:
%p release invalid cache data"
,
pNode
);
uError
(
"%p release invalid cache data"
,
pNode
);
return
;
}
*
data
=
NULL
;
int32_t
ref
=
T_REF_DEC
(
pNode
);
uDebug
(
"
%p data released, refcnt:%d"
,
pNode
,
ref
);
uDebug
(
"
key:%s, is released, %p refcnt:%d"
,
pNode
->
key
,
pNode
,
ref
);
if
(
_remove
)
{
__cache_wr_lock
(
pCacheObj
);
...
...
@@ -501,7 +501,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
pNode
->
inTrashCan
=
true
;
pCacheObj
->
numOfElemsInTrash
++
;
uDebug
(
"key:%s %p move to trash, numOfElem in trash:%d"
,
pNode
->
key
,
pNode
,
pCacheObj
->
numOfElemsInTrash
);
uDebug
(
"key:%s
,
%p move to trash, numOfElem in trash:%d"
,
pNode
->
key
,
pNode
,
pCacheObj
->
numOfElemsInTrash
);
}
void
taosRemoveFromTrashCan
(
SCacheObj
*
pCacheObj
,
STrashElem
*
pElem
)
{
...
...
@@ -549,7 +549,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
}
if
(
force
||
(
T_REF_VAL_GET
(
pElem
->
pData
)
==
0
))
{
uDebug
(
"key:%s %p removed from trash. numOfElem in trash:%d"
,
pElem
->
pData
->
key
,
pElem
->
pData
,
uDebug
(
"key:%s
,
%p removed from trash. numOfElem in trash:%d"
,
pElem
->
pData
->
key
,
pElem
->
pData
,
pCacheObj
->
numOfElemsInTrash
-
1
);
STrashElem
*
p
=
pElem
;
...
...
@@ -570,8 +570,11 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
while
(
taosHashIterNext
(
pIter
))
{
SCacheDataNode
*
pNode
=
*
(
SCacheDataNode
**
)
taosHashIterGet
(
pIter
);
// if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
//}
if
(
T_REF_VAL_GET
(
pNode
)
<=
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
else
{
uDebug
(
"key:%s, will not remove from cache, refcnt:%d"
,
pNode
->
key
,
T_REF_VAL_GET
(
pNode
));
}
}
taosHashDestroyIter
(
pIter
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录