Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e2738567
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e2738567
编写于
7月 12, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feat/tsdb_snapshot
上级
98cb6ae0
a67c8560
变更
18
展开全部
隐藏空白更改
内联
并排
Showing
18 changed file
with
179 addition
and
1251 deletion
+179
-1251
include/common/tdataformat.h
include/common/tdataformat.h
+13
-188
include/common/trow.h
include/common/trow.h
+5
-7
include/util/tqueue.h
include/util/tqueue.h
+5
-2
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+39
-309
source/common/src/tglobal.c
source/common/src/tglobal.c
+2
-1
source/common/src/trow.c
source/common/src/trow.c
+2
-662
source/common/test/dataformatTest.cpp
source/common/test/dataformatTest.cpp
+3
-3
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+1
-1
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
+1
-1
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+2
-0
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+30
-22
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+12
-9
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+6
-8
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+7
-5
source/libs/transport/test/pushServer.c
source/libs/transport/test/pushServer.c
+4
-2
source/util/src/tqueue.c
source/util/src/tqueue.c
+24
-11
source/util/src/tworker.c
source/util/src/tworker.c
+20
-20
未找到文件。
include/common/tdataformat.h
浏览文件 @
e2738567
...
@@ -64,18 +64,22 @@ int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type);
...
@@ -64,18 +64,22 @@ int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type);
int32_t
tGetValue
(
uint8_t
*
p
,
SValue
*
pValue
,
int8_t
type
);
int32_t
tGetValue
(
uint8_t
*
p
,
SValue
*
pValue
,
int8_t
type
);
int
tValueCmprFn
(
const
SValue
*
pValue1
,
const
SValue
*
pValue2
,
int8_t
type
);
int
tValueCmprFn
(
const
SValue
*
pValue1
,
const
SValue
*
pValue2
,
int8_t
type
);
// S
TSRow2
// S
ColVal
#define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
#define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
#define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
#define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
#define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
#define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
// STSRow2
#define TSROW_LEN(PROW, V) tGetI32v((uint8_t *)(PROW)->data, (V) ? &(V) : NULL)
#define TSROW_SVER(PROW, V) tGetI32v((PROW)->data + TSROW_LEN(PROW, NULL), (V) ? &(V) : NULL)
int32_t
tTSRowNew
(
STSRowBuilder
*
pBuilder
,
SArray
*
pArray
,
STSchema
*
pTSchema
,
STSRow2
**
ppRow
);
int32_t
tTSRowNew
(
STSRowBuilder
*
pBuilder
,
SArray
*
pArray
,
STSchema
*
pTSchema
,
STSRow2
**
ppRow
);
int32_t
tTSRowClone
(
const
STSRow2
*
pRow
,
STSRow2
**
ppRow
);
int32_t
tTSRowClone
(
const
STSRow2
*
pRow
,
STSRow2
**
ppRow
);
void
tTSRowFree
(
STSRow2
*
pRow
);
void
tTSRowFree
(
STSRow2
*
pRow
);
void
tTSRowGet
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
);
void
tTSRowGet
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
);
int32_t
tTSRowToArray
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
SArray
**
ppArray
);
int32_t
tTSRowToArray
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
SArray
**
ppArray
);
int32_t
tPutTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
);
int32_t
tPutTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
);
int32_t
tGetTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
);
int32_t
tGetTSRow
(
uint8_t
*
p
,
STSRow2
*
*
p
pRow
);
// STSRowBuilder
// STSRowBuilder
#define tsRowBuilderInit() ((STSRowBuilder){0})
#define tsRowBuilderInit() ((STSRowBuilder){0})
...
@@ -97,7 +101,7 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
...
@@ -97,7 +101,7 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t
tDecodeTag
(
SDecoder
*
pDecoder
,
STag
**
ppTag
);
int32_t
tDecodeTag
(
SDecoder
*
pDecoder
,
STag
**
ppTag
);
int32_t
tTagToValArray
(
const
STag
*
pTag
,
SArray
**
ppArray
);
int32_t
tTagToValArray
(
const
STag
*
pTag
,
SArray
**
ppArray
);
void
debugPrintSTag
(
STag
*
pTag
,
const
char
*
tag
,
int32_t
ln
);
// TODO: remove
void
debugPrintSTag
(
STag
*
pTag
,
const
char
*
tag
,
int32_t
ln
);
// TODO: remove
int32_t
parseJsontoTagData
(
const
char
*
json
,
SArray
*
pTagVals
,
STag
**
ppTag
,
void
*
pMsgBuf
);
int32_t
parseJsontoTagData
(
const
char
*
json
,
SArray
*
pTagVals
,
STag
**
ppTag
,
void
*
pMsgBuf
);
// STRUCT =================
// STRUCT =================
struct
STColumn
{
struct
STColumn
{
...
@@ -123,16 +127,16 @@ struct STSchema {
...
@@ -123,16 +127,16 @@ struct STSchema {
#define TSROW_KV_SMALL ((uint8_t)0x10U)
#define TSROW_KV_SMALL ((uint8_t)0x10U)
#define TSROW_KV_MID ((uint8_t)0x20U)
#define TSROW_KV_MID ((uint8_t)0x20U)
#define TSROW_KV_BIG ((uint8_t)0x40U)
#define TSROW_KV_BIG ((uint8_t)0x40U)
#pragma pack(push, 1)
struct
STSRow2
{
struct
STSRow2
{
TSKEY
ts
;
TSKEY
ts
;
uint8_t
flags
;
uint8_t
flags
;
int32_t
sver
;
uint8_t
data
[];
uint32_t
nData
;
uint8_t
*
pData
;
};
};
#pragma pack(pop)
struct
STSRowBuilder
{
struct
STSRowBuilder
{
STSRow2
tsRow
;
//
STSRow2 tsRow;
int32_t
szBuf
;
int32_t
szBuf
;
uint8_t
*
pBuf
;
uint8_t
*
pBuf
;
};
};
...
@@ -226,50 +230,6 @@ struct STag {
...
@@ -226,50 +230,6 @@ struct STag {
memcpy(varDataVal(x), (str), (_size)); \
memcpy(varDataVal(x), (str), (_size)); \
} while (0);
} while (0);
// ----------------- TSDB COLUMN DEFINITION
#define colType(col) ((col)->type)
#define colFlags(col) ((col)->flags)
#define colColId(col) ((col)->colId)
#define colBytes(col) ((col)->bytes)
#define colOffset(col) ((col)->offset)
#define colSetType(col, t) (colType(col) = (t))
#define colSetFlags(col, f) (colFlags(col) = (f))
#define colSetColId(col, id) (colColId(col) = (id))
#define colSetBytes(col, b) (colBytes(col) = (b))
#define colSetOffset(col, o) (colOffset(col) = (o))
// ----------------- TSDB SCHEMA DEFINITION
#define schemaNCols(s) ((s)->numOfCols)
#define schemaVersion(s) ((s)->version)
#define schemaTLen(s) ((s)->tlen)
#define schemaFLen(s) ((s)->flen)
#define schemaVLen(s) ((s)->vlen)
#define schemaColAt(s, i) ((s)->columns + i)
#define tdFreeSchema(s) taosMemoryFreeClear((s))
STSchema
*
tdDupSchema
(
const
STSchema
*
pSchema
);
int32_t
tdEncodeSchema
(
void
**
buf
,
STSchema
*
pSchema
);
void
*
tdDecodeSchema
(
void
*
buf
,
STSchema
**
pRSchema
);
static
FORCE_INLINE
int32_t
comparColId
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(
*
(
int16_t
*
)
key1
>
((
STColumn
*
)
key2
)
->
colId
)
{
return
1
;
}
else
if
(
*
(
int16_t
*
)
key1
<
((
STColumn
*
)
key2
)
->
colId
)
{
return
-
1
;
}
else
{
return
0
;
}
}
static
FORCE_INLINE
STColumn
*
tdGetColOfID
(
STSchema
*
pSchema
,
int16_t
colId
)
{
void
*
ptr
=
bsearch
(
&
colId
,
(
void
*
)
pSchema
->
columns
,
schemaNCols
(
pSchema
),
sizeof
(
STColumn
),
comparColId
);
if
(
ptr
==
NULL
)
return
NULL
;
return
(
STColumn
*
)
ptr
;
}
// ----------------- SCHEMA BUILDER DEFINITION
// ----------------- SCHEMA BUILDER DEFINITION
typedef
struct
{
typedef
struct
{
int32_t
tCols
;
int32_t
tCols
;
...
@@ -299,141 +259,6 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version)
...
@@ -299,141 +259,6 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version)
int32_t
tdAddColToSchema
(
STSchemaBuilder
*
pBuilder
,
int8_t
type
,
int8_t
flags
,
col_id_t
colId
,
col_bytes_t
bytes
);
int32_t
tdAddColToSchema
(
STSchemaBuilder
*
pBuilder
,
int8_t
type
,
int8_t
flags
,
col_id_t
colId
,
col_bytes_t
bytes
);
STSchema
*
tdGetSchemaFromBuilder
(
STSchemaBuilder
*
pBuilder
);
STSchema
*
tdGetSchemaFromBuilder
(
STSchemaBuilder
*
pBuilder
);
// ----------------- Semantic timestamp key definition
// typedef uint64_t TKEY;
#define TKEY TSKEY
#define TKEY_INVALID UINT64_MAX
#define TKEY_NULL TKEY_INVALID
#define TKEY_NEGATIVE_FLAG (((TKEY)1) << 63)
#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG))
#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0)
#define TKEY_IS_DELETED(tkey) (false)
#define tdGetTKEY(key) (key)
#define tdGetKey(tskey) (tskey)
#define MIN_TS_KEY ((TSKEY)0x8000000000000001)
#define MAX_TS_KEY ((TSKEY)0x7fffffffffffffff)
#define TD_TO_TKEY(key) tdGetTKEY(((key) < MIN_TS_KEY) ? MIN_TS_KEY : (((key) > MAX_TS_KEY) ? MAX_TS_KEY : key))
static
FORCE_INLINE
TKEY
keyToTkey
(
TSKEY
key
)
{
TSKEY
lkey
=
key
;
if
(
key
>
MAX_TS_KEY
)
{
lkey
=
MAX_TS_KEY
;
}
else
if
(
key
<
MIN_TS_KEY
)
{
lkey
=
MIN_TS_KEY
;
}
return
tdGetTKEY
(
lkey
);
}
static
FORCE_INLINE
int32_t
tkeyComparFn
(
const
void
*
tkey1
,
const
void
*
tkey2
)
{
TSKEY
key1
=
tdGetKey
(
*
(
TKEY
*
)
tkey1
);
TSKEY
key2
=
tdGetKey
(
*
(
TKEY
*
)
tkey2
);
if
(
key1
<
key2
)
{
return
-
1
;
}
else
if
(
key1
>
key2
)
{
return
1
;
}
else
{
return
0
;
}
}
// ----------------- Data column structure
// SDataCol arrangement: data => bitmap => dataOffset
typedef
struct
SDataCol
{
int8_t
type
;
// column type
uint8_t
bitmap
:
1
;
// 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows
uint8_t
reserve
:
7
;
int16_t
colId
;
// column ID
int32_t
bytes
;
// column data bytes defined
int32_t
offset
;
// data offset in a SDataRow (including the header size)
int32_t
spaceSize
;
// Total space size for this column
int32_t
len
;
// column data length
VarDataOffsetT
*
dataOff
;
// For binary and nchar data, the offset in the data column
void
*
pData
;
// Actual data pointer
void
*
pBitmap
;
// Bitmap pointer
TSKEY
ts
;
// only used in last NULL column
}
SDataCol
;
#define isAllRowsNull(pCol) ((pCol)->len == 0)
#define isAllRowsNone(pCol) ((pCol)->len == 0)
static
FORCE_INLINE
void
dataColReset
(
SDataCol
*
pDataCol
)
{
pDataCol
->
len
=
0
;
}
int32_t
tdAllocMemForCol
(
SDataCol
*
pCol
,
int32_t
maxPoints
);
void
dataColInit
(
SDataCol
*
pDataCol
,
STColumn
*
pCol
,
int32_t
maxPoints
);
int32_t
dataColAppendVal
(
SDataCol
*
pCol
,
const
void
*
value
,
int32_t
numOfRows
,
int32_t
maxPoints
);
void
*
dataColSetOffset
(
SDataCol
*
pCol
,
int32_t
nEle
);
bool
isNEleNull
(
SDataCol
*
pCol
,
int32_t
nEle
);
typedef
struct
{
col_id_t
maxCols
;
// max number of columns
col_id_t
numOfCols
;
// Total number of cols
int32_t
maxPoints
;
// max number of points
int32_t
numOfRows
;
int32_t
bitmapMode
:
1
;
// default is 0(2 bits), otherwise 1(1 bit)
int32_t
sversion
:
31
;
// TODO: set sversion(not used yet)
SDataCol
*
cols
;
}
SDataCols
;
static
FORCE_INLINE
bool
tdDataColsIsBitmapI
(
SDataCols
*
pCols
)
{
return
pCols
->
bitmapMode
!=
TSDB_BITMODE_DEFAULT
;
}
static
FORCE_INLINE
void
tdDataColsSetBitmapI
(
SDataCols
*
pCols
)
{
pCols
->
bitmapMode
=
TSDB_BITMODE_ONE_BIT
;
}
static
FORCE_INLINE
bool
tdIsBitmapModeI
(
int8_t
bitmapMode
)
{
return
bitmapMode
!=
TSDB_BITMODE_DEFAULT
;
}
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data
#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
static
FORCE_INLINE
TKEY
dataColsTKeyFirst
(
SDataCols
*
pCols
)
{
if
(
pCols
->
numOfRows
)
{
return
dataColsTKeyAt
(
pCols
,
0
);
}
else
{
return
TKEY_INVALID
;
}
}
static
FORCE_INLINE
TSKEY
dataColsKeyAtRow
(
SDataCols
*
pCols
,
int32_t
row
)
{
assert
(
row
<
pCols
->
numOfRows
);
return
dataColsKeyAt
(
pCols
,
row
);
}
static
FORCE_INLINE
TSKEY
dataColsKeyFirst
(
SDataCols
*
pCols
)
{
if
(
pCols
->
numOfRows
)
{
return
dataColsKeyAt
(
pCols
,
0
);
}
else
{
return
TSDB_DATA_TIMESTAMP_NULL
;
}
}
static
FORCE_INLINE
TKEY
dataColsTKeyLast
(
SDataCols
*
pCols
)
{
if
(
pCols
->
numOfRows
)
{
return
dataColsTKeyAt
(
pCols
,
pCols
->
numOfRows
-
1
);
}
else
{
return
TKEY_INVALID
;
}
}
static
FORCE_INLINE
TSKEY
dataColsKeyLast
(
SDataCols
*
pCols
)
{
if
(
pCols
->
numOfRows
)
{
return
dataColsKeyAt
(
pCols
,
pCols
->
numOfRows
-
1
);
}
else
{
return
TSDB_DATA_TIMESTAMP_NULL
;
}
}
SDataCols
*
tdNewDataCols
(
int32_t
maxCols
,
int32_t
maxRows
);
void
tdResetDataCols
(
SDataCols
*
pCols
);
int32_t
tdInitDataCols
(
SDataCols
*
pCols
,
STSchema
*
pSchema
);
SDataCols
*
tdDupDataCols
(
SDataCols
*
pCols
,
bool
keepData
);
SDataCols
*
tdFreeDataCols
(
SDataCols
*
pCols
);
int32_t
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
source
,
int32_t
rowsToMerge
,
int32_t
*
pOffset
,
bool
update
,
TDRowVerT
maxVer
);
#endif
#endif
#ifdef __cplusplus
#ifdef __cplusplus
...
...
include/common/trow.h
浏览文件 @
e2738567
...
@@ -168,7 +168,7 @@ typedef struct {
...
@@ -168,7 +168,7 @@ typedef struct {
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (
schemaTLen(s)
+ TD_ROW_HEAD_LEN)
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (
(s)->tlen
+ TD_ROW_HEAD_LEN)
#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i))
#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i))
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
...
@@ -223,9 +223,10 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDR
...
@@ -223,9 +223,10 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDR
static
FORCE_INLINE
int32_t
tdGetBitmapValType
(
const
void
*
pBitmap
,
int16_t
colIdx
,
TDRowValT
*
pValType
,
static
FORCE_INLINE
int32_t
tdGetBitmapValType
(
const
void
*
pBitmap
,
int16_t
colIdx
,
TDRowValT
*
pValType
,
int8_t
bitmapMode
);
int8_t
bitmapMode
);
bool
tdIsBitmapBlkNorm
(
const
void
*
pBitmap
,
int32_t
numOfBits
,
int8_t
bitmapMode
);
bool
tdIsBitmapBlkNorm
(
const
void
*
pBitmap
,
int32_t
numOfBits
,
int8_t
bitmapMode
);
int32_t
tdAppendValToDataCol
(
SDataCol
*
pCol
,
TDRowValT
valType
,
const
void
*
val
,
int32_t
numOfRows
,
int32_t
maxPoints
,
// int32_t tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int32_t numOfRows, int32_t
int8_t
bitmapMode
,
bool
isMerge
);
// maxPoints,
int32_t
tdAppendSTSRowToDataCol
(
STSRow
*
pRow
,
STSchema
*
pSchema
,
SDataCols
*
pCols
,
bool
isMerge
);
// int8_t bitmapMode, bool isMerge);
// int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge);
int32_t
tdGetBitmapValTypeII
(
const
void
*
pBitmap
,
int16_t
colIdx
,
TDRowValT
*
pValType
);
int32_t
tdGetBitmapValTypeII
(
const
void
*
pBitmap
,
int16_t
colIdx
,
TDRowValT
*
pValType
);
int32_t
tdSetBitmapValTypeI
(
void
*
pBitmap
,
int16_t
colIdx
,
TDRowValT
valType
);
int32_t
tdSetBitmapValTypeI
(
void
*
pBitmap
,
int16_t
colIdx
,
TDRowValT
valType
);
...
@@ -318,12 +319,9 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC
...
@@ -318,12 +319,9 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC
bool
tdGetTpRowDataOfCol
(
STSRowIter
*
pIter
,
col_type_t
colType
,
int32_t
offset
,
SCellVal
*
pVal
);
bool
tdGetTpRowDataOfCol
(
STSRowIter
*
pIter
,
col_type_t
colType
,
int32_t
offset
,
SCellVal
*
pVal
);
bool
tdGetKvRowValOfColEx
(
STSRowIter
*
pIter
,
col_id_t
colId
,
col_type_t
colType
,
col_id_t
*
nIdx
,
SCellVal
*
pVal
);
bool
tdGetKvRowValOfColEx
(
STSRowIter
*
pIter
,
col_id_t
colId
,
col_type_t
colType
,
col_id_t
*
nIdx
,
SCellVal
*
pVal
);
bool
tdSTSRowIterNext
(
STSRowIter
*
pIter
,
col_id_t
colId
,
col_type_t
colType
,
SCellVal
*
pVal
);
bool
tdSTSRowIterNext
(
STSRowIter
*
pIter
,
col_id_t
colId
,
col_type_t
colType
,
SCellVal
*
pVal
);
STSRow
*
mergeTwoRows
(
void
*
buffer
,
STSRow
*
row1
,
STSRow
*
row2
,
STSchema
*
pSchema1
,
STSchema
*
pSchema2
);
int32_t
tdGetColDataOfRow
(
SCellVal
*
pVal
,
SDataCol
*
pCol
,
int32_t
row
,
int8_t
bitmapMode
);
bool
tdSTpRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_type_t
colType
,
int32_t
flen
,
uint32_t
offset
,
col_id_t
colIdx
,
bool
tdSTpRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_type_t
colType
,
int32_t
flen
,
uint32_t
offset
,
col_id_t
colIdx
,
SCellVal
*
pVal
);
SCellVal
*
pVal
);
bool
tdSKvRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_id_t
colIdx
,
SCellVal
*
pVal
);
bool
tdSKvRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_id_t
colIdx
,
SCellVal
*
pVal
);
int32_t
dataColGetNEleLen
(
SDataCol
*
pDataCol
,
int32_t
rows
,
int8_t
bitmapMode
);
void
tdSCellValPrint
(
SCellVal
*
pVal
,
int8_t
colType
);
void
tdSCellValPrint
(
SCellVal
*
pVal
,
int8_t
colType
);
void
tdSRowPrint
(
STSRow
*
row
,
STSchema
*
pSchema
,
const
char
*
tag
);
void
tdSRowPrint
(
STSRow
*
row
,
STSchema
*
pSchema
,
const
char
*
tag
);
...
...
include/util/tqueue.h
浏览文件 @
e2738567
...
@@ -44,6 +44,8 @@ typedef struct STaosQset STaosQset;
...
@@ -44,6 +44,8 @@ typedef struct STaosQset STaosQset;
typedef
struct
STaosQall
STaosQall
;
typedef
struct
STaosQall
STaosQall
;
typedef
struct
{
typedef
struct
{
void
*
ahandle
;
void
*
ahandle
;
void
*
fp
;
void
*
queue
;
int32_t
workerId
;
int32_t
workerId
;
int32_t
threadNum
;
int32_t
threadNum
;
int64_t
timestamp
;
int64_t
timestamp
;
...
@@ -65,6 +67,7 @@ void taosFreeQitem(void *pItem);
...
@@ -65,6 +67,7 @@ void taosFreeQitem(void *pItem);
void
taosWriteQitem
(
STaosQueue
*
queue
,
void
*
pItem
);
void
taosWriteQitem
(
STaosQueue
*
queue
,
void
*
pItem
);
int32_t
taosReadQitem
(
STaosQueue
*
queue
,
void
**
ppItem
);
int32_t
taosReadQitem
(
STaosQueue
*
queue
,
void
**
ppItem
);
bool
taosQueueEmpty
(
STaosQueue
*
queue
);
bool
taosQueueEmpty
(
STaosQueue
*
queue
);
void
taosUpdateItemSize
(
STaosQueue
*
queue
,
int32_t
items
);
int32_t
taosQueueItemSize
(
STaosQueue
*
queue
);
int32_t
taosQueueItemSize
(
STaosQueue
*
queue
);
int64_t
taosQueueMemorySize
(
STaosQueue
*
queue
);
int64_t
taosQueueMemorySize
(
STaosQueue
*
queue
);
...
@@ -81,8 +84,8 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
...
@@ -81,8 +84,8 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
void
taosRemoveFromQset
(
STaosQset
*
qset
,
STaosQueue
*
queue
);
void
taosRemoveFromQset
(
STaosQset
*
qset
,
STaosQueue
*
queue
);
int32_t
taosGetQueueNumber
(
STaosQset
*
qset
);
int32_t
taosGetQueueNumber
(
STaosQset
*
qset
);
int32_t
taosReadQitemFromQset
(
STaosQset
*
qset
,
void
**
ppItem
,
int64_t
*
ts
,
void
**
ahandle
,
FItem
*
itemFp
);
int32_t
taosReadQitemFromQset
(
STaosQset
*
qset
,
void
**
ppItem
,
SQueueInfo
*
qinfo
);
int32_t
taosReadAllQitemsFromQset
(
STaosQset
*
qset
,
STaosQall
*
qall
,
void
**
ahandle
,
FItems
*
itemsFp
);
int32_t
taosReadAllQitemsFromQset
(
STaosQset
*
qset
,
STaosQall
*
qall
,
SQueueInfo
*
qinfo
);
void
taosResetQsetThread
(
STaosQset
*
qset
,
void
*
pItem
);
void
taosResetQsetThread
(
STaosQset
*
qset
,
void
*
pItem
);
extern
int64_t
tsRpcQueueMemoryAllowed
;
extern
int64_t
tsRpcQueueMemoryAllowed
;
...
...
source/common/src/tdataformat.c
浏览文件 @
e2738567
...
@@ -175,7 +175,8 @@ static void setBitMap(uint8_t *pb, uint8_t v, int32_t idx, uint8_t flags) {
...
@@ -175,7 +175,8 @@ static void setBitMap(uint8_t *pb, uint8_t v, int32_t idx, uint8_t flags) {
} while (0)
} while (0)
int32_t
tTSRowNew
(
STSRowBuilder
*
pBuilder
,
SArray
*
pArray
,
STSchema
*
pTSchema
,
STSRow2
**
ppRow
)
{
int32_t
tTSRowNew
(
STSRowBuilder
*
pBuilder
,
SArray
*
pArray
,
STSchema
*
pTSchema
,
STSRow2
**
ppRow
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
#if 0
STColumn *pTColumn;
STColumn *pTColumn;
SColVal *pColVal;
SColVal *pColVal;
int32_t nColVal = taosArrayGetSize(pArray);
int32_t nColVal = taosArrayGetSize(pArray);
...
@@ -462,30 +463,22 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
...
@@ -462,30 +463,22 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
}
}
}
}
#endif
_exit:
_exit:
return
code
;
return
code
;
}
}
int32_t
tTSRowClone
(
const
STSRow2
*
pRow
,
STSRow2
**
ppRow
)
{
int32_t
tTSRowClone
(
const
STSRow2
*
pRow
,
STSRow2
**
ppRow
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
rLen
;
(
*
ppRow
)
=
(
STSRow2
*
)
taosMemoryMalloc
(
sizeof
(
**
ppRow
));
TSROW_LEN
(
pRow
,
rLen
);
(
*
ppRow
)
=
(
STSRow2
*
)
taosMemoryMalloc
(
rLen
);
if
(
*
ppRow
==
NULL
)
{
if
(
*
ppRow
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
goto
_exit
;
}
}
**
ppRow
=
*
pRow
;
memcpy
(
*
ppRow
,
pRow
,
rLen
);
(
*
ppRow
)
->
pData
=
NULL
;
if
(
pRow
->
nData
)
{
(
*
ppRow
)
->
pData
=
taosMemoryMalloc
(
pRow
->
nData
);
if
((
*
ppRow
)
->
pData
==
NULL
)
{
taosMemoryFree
(
*
ppRow
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
memcpy
((
*
ppRow
)
->
pData
,
pRow
->
pData
,
pRow
->
nData
);
}
_exit:
_exit:
return
code
;
return
code
;
...
@@ -493,12 +486,12 @@ _exit:
...
@@ -493,12 +486,12 @@ _exit:
void
tTSRowFree
(
STSRow2
*
pRow
)
{
void
tTSRowFree
(
STSRow2
*
pRow
)
{
if
(
pRow
)
{
if
(
pRow
)
{
if
(
pRow
->
pData
)
taosMemoryFree
(
pRow
->
pData
);
taosMemoryFree
(
pRow
);
taosMemoryFree
(
pRow
);
}
}
}
}
void
tTSRowGet
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
)
{
void
tTSRowGet
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
)
{
#if 0
uint8_t isTuple = ((pRow->flags & 0xf0) == 0) ? 1 : 0;
uint8_t isTuple = ((pRow->flags & 0xf0) == 0) ? 1 : 0;
STColumn *pTColumn = &pTSchema->columns[iCol];
STColumn *pTColumn = &pTSchema->columns[iCol];
uint8_t flags = pRow->flags & (uint8_t)0xf;
uint8_t flags = pRow->flags & (uint8_t)0xf;
...
@@ -643,10 +636,12 @@ _return_null:
...
@@ -643,10 +636,12 @@ _return_null:
_return_value:
_return_value:
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
return;
return;
#endif
}
}
int32_t
tTSRowToArray
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
SArray
**
ppArray
)
{
int32_t
tTSRowToArray
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
SArray
**
ppArray
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
#if 0
SColVal cv;
SColVal cv;
(*ppArray) = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
(*ppArray) = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
...
@@ -660,52 +655,27 @@ int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray) {
...
@@ -660,52 +655,27 @@ int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray) {
taosArrayPush(*ppArray, &cv);
taosArrayPush(*ppArray, &cv);
}
}
#endif
_exit:
_exit:
return
code
;
return
code
;
}
}
int32_t
tPutTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
)
{
int32_t
tPutTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
)
{
int32_t
n
=
0
;
int32_t
n
;
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pRow
->
ts
);
TSROW_LEN
(
pRow
,
n
);
n
+=
tPutI8
(
p
?
p
+
n
:
p
,
pRow
->
flags
);
if
(
p
)
{
n
+=
tPutI32v
(
p
?
p
+
n
:
p
,
pRow
->
sver
);
memcpy
(
p
,
pRow
,
n
);
ASSERT
(
pRow
->
flags
&
0xf
);
switch
(
pRow
->
flags
&
0xf
)
{
case
TSROW_HAS_NONE
:
case
TSROW_HAS_NULL
:
ASSERT
(
pRow
->
nData
==
0
);
ASSERT
(
pRow
->
pData
==
NULL
);
break
;
default:
ASSERT
(
pRow
->
nData
&&
pRow
->
pData
);
n
+=
tPutBinary
(
p
?
p
+
n
:
p
,
pRow
->
pData
,
pRow
->
nData
);
break
;
}
}
return
n
;
return
n
;
}
}
int32_t
tGetTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
)
{
int32_t
tGetTSRow
(
uint8_t
*
p
,
STSRow2
*
*
p
pRow
)
{
int32_t
n
=
0
;
int32_t
n
;
n
+=
tGetI64
(
p
+
n
,
&
pRow
->
ts
);
*
ppRow
=
(
STSRow2
*
)
p
;
n
+=
tGetI8
(
p
+
n
,
&
pRow
->
flags
);
TSROW_LEN
(
*
ppRow
,
n
);
n
+=
tGetI32v
(
p
+
n
,
&
pRow
->
sver
);
ASSERT
(
pRow
->
flags
);
switch
(
pRow
->
flags
&
0xf
)
{
case
TSROW_HAS_NONE
:
case
TSROW_HAS_NULL
:
pRow
->
nData
=
0
;
pRow
->
pData
=
NULL
;
break
;
default:
n
+=
tGetBinary
(
p
+
n
,
&
pRow
->
pData
,
&
pRow
->
nData
);
break
;
}
return
n
;
return
n
;
}
}
...
@@ -904,15 +874,13 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
...
@@ -904,15 +874,13 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
return
n
;
return
n
;
}
}
bool
tTagIsJson
(
const
void
*
pTag
){
bool
tTagIsJson
(
const
void
*
pTag
)
{
return
(((
const
STag
*
)
pTag
)
->
flags
&
TD_TAG_JSON
);
}
return
(((
const
STag
*
)
pTag
)
->
flags
&
TD_TAG_JSON
);
}
bool
tTagIsJsonNull
(
void
*
data
){
bool
tTagIsJsonNull
(
void
*
data
)
{
STag
*
pTag
=
(
STag
*
)
data
;
STag
*
pTag
=
(
STag
*
)
data
;
int8_t
isJson
=
tTagIsJson
(
pTag
);
int8_t
isJson
=
tTagIsJson
(
pTag
);
if
(
!
isJson
)
return
false
;
if
(
!
isJson
)
return
false
;
return
((
STag
*
)
data
)
->
nTag
==
0
;
return
((
STag
*
)
data
)
->
nTag
==
0
;
}
}
int32_t
tTagNew
(
SArray
*
pArray
,
int32_t
version
,
int8_t
isJson
,
STag
**
ppTag
)
{
int32_t
tTagNew
(
SArray
*
pArray
,
int32_t
version
,
int8_t
isJson
,
STag
**
ppTag
)
{
...
@@ -1097,112 +1065,6 @@ _err:
...
@@ -1097,112 +1065,6 @@ _err:
}
}
#if 1 // ===================================================================================================================
#if 1 // ===================================================================================================================
static
void
dataColSetNEleNull
(
SDataCol
*
pCol
,
int
nEle
);
int
tdAllocMemForCol
(
SDataCol
*
pCol
,
int
maxPoints
)
{
int
spaceNeeded
=
pCol
->
bytes
*
maxPoints
;
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
spaceNeeded
+=
sizeof
(
VarDataOffsetT
)
*
maxPoints
;
}
#ifdef TD_SUPPORT_BITMAP
int32_t
nBitmapBytes
=
(
int32_t
)
TD_BITMAP_BYTES
(
maxPoints
);
spaceNeeded
+=
(
int
)
nBitmapBytes
;
// TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more
// TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered.
// spaceNeeded += TYPE_BYTES[pCol->type]; // the bitmap part is append as a single part since 2022.04.03, thus
// remove the additional space
#endif
if
(
pCol
->
spaceSize
<
spaceNeeded
)
{
void
*
ptr
=
taosMemoryRealloc
(
pCol
->
pData
,
spaceNeeded
);
if
(
ptr
==
NULL
)
{
uDebug
(
"malloc failure, size:%"
PRId64
" failed, reason:%s"
,
(
int64_t
)
spaceNeeded
,
strerror
(
errno
));
return
-
1
;
}
else
{
pCol
->
pData
=
ptr
;
pCol
->
spaceSize
=
spaceNeeded
;
}
}
#ifdef TD_SUPPORT_BITMAP
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
pBitmap
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
pCol
->
dataOff
=
POINTER_SHIFT
(
pCol
->
pBitmap
,
nBitmapBytes
);
}
else
{
pCol
->
pBitmap
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
}
#else
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
dataOff
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
}
#endif
return
0
;
}
/**
* Duplicate the schema and return a new object
*/
STSchema
*
tdDupSchema
(
const
STSchema
*
pSchema
)
{
int
tlen
=
sizeof
(
STSchema
)
+
sizeof
(
STColumn
)
*
schemaNCols
(
pSchema
);
STSchema
*
tSchema
=
(
STSchema
*
)
taosMemoryMalloc
(
tlen
);
if
(
tSchema
==
NULL
)
return
NULL
;
memcpy
((
void
*
)
tSchema
,
(
void
*
)
pSchema
,
tlen
);
return
tSchema
;
}
/**
* Encode a schema to dst, and return the next pointer
*/
int
tdEncodeSchema
(
void
**
buf
,
STSchema
*
pSchema
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
schemaVersion
(
pSchema
));
tlen
+=
taosEncodeFixedI32
(
buf
,
schemaNCols
(
pSchema
));
for
(
int
i
=
0
;
i
<
schemaNCols
(
pSchema
);
i
++
)
{
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
i
);
tlen
+=
taosEncodeFixedI8
(
buf
,
colType
(
pCol
));
tlen
+=
taosEncodeFixedI8
(
buf
,
colFlags
(
pCol
));
tlen
+=
taosEncodeFixedI16
(
buf
,
colColId
(
pCol
));
tlen
+=
taosEncodeFixedI16
(
buf
,
colBytes
(
pCol
));
}
return
tlen
;
}
/**
* Decode a schema from a binary.
*/
void
*
tdDecodeSchema
(
void
*
buf
,
STSchema
**
pRSchema
)
{
int
version
=
0
;
int
numOfCols
=
0
;
STSchemaBuilder
schemaBuilder
;
buf
=
taosDecodeFixedI32
(
buf
,
&
version
);
buf
=
taosDecodeFixedI32
(
buf
,
&
numOfCols
);
if
(
tdInitTSchemaBuilder
(
&
schemaBuilder
,
version
)
<
0
)
return
NULL
;
for
(
int
i
=
0
;
i
<
numOfCols
;
i
++
)
{
col_type_t
type
=
0
;
int8_t
flags
=
0
;
col_id_t
colId
=
0
;
col_bytes_t
bytes
=
0
;
buf
=
taosDecodeFixedI8
(
buf
,
&
type
);
buf
=
taosDecodeFixedI8
(
buf
,
&
flags
);
buf
=
taosDecodeFixedI16
(
buf
,
&
colId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
bytes
);
if
(
tdAddColToSchema
(
&
schemaBuilder
,
type
,
flags
,
colId
,
bytes
)
<
0
)
{
tdDestroyTSchemaBuilder
(
&
schemaBuilder
);
return
NULL
;
}
}
*
pRSchema
=
tdGetSchemaFromBuilder
(
&
schemaBuilder
);
tdDestroyTSchemaBuilder
(
&
schemaBuilder
);
return
buf
;
}
int
tdInitTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
,
schema_ver_t
version
)
{
int
tdInitTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
,
schema_ver_t
version
)
{
if
(
pBuilder
==
NULL
)
return
-
1
;
if
(
pBuilder
==
NULL
)
return
-
1
;
...
@@ -1239,22 +1101,22 @@ int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, c
...
@@ -1239,22 +1101,22 @@ int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, c
}
}
STColumn
*
pCol
=
&
(
pBuilder
->
columns
[
pBuilder
->
nCols
]);
STColumn
*
pCol
=
&
(
pBuilder
->
columns
[
pBuilder
->
nCols
]);
colSetType
(
pCol
,
type
)
;
pCol
->
type
=
type
;
colSetColId
(
pCol
,
colId
)
;
pCol
->
colId
=
colId
;
colSetFlags
(
pCol
,
flags
)
;
pCol
->
flags
=
flags
;
if
(
pBuilder
->
nCols
==
0
)
{
if
(
pBuilder
->
nCols
==
0
)
{
colSetOffset
(
pCol
,
0
)
;
pCol
->
offset
=
0
;
}
else
{
}
else
{
STColumn
*
pTCol
=
&
(
pBuilder
->
columns
[
pBuilder
->
nCols
-
1
]);
STColumn
*
pTCol
=
&
(
pBuilder
->
columns
[
pBuilder
->
nCols
-
1
]);
colSetOffset
(
pCol
,
pTCol
->
offset
+
TYPE_BYTES
[
pTCol
->
type
])
;
pCol
->
offset
=
pTCol
->
offset
+
TYPE_BYTES
[
pTCol
->
type
]
;
}
}
if
(
IS_VAR_DATA_TYPE
(
type
))
{
if
(
IS_VAR_DATA_TYPE
(
type
))
{
colSetBytes
(
pCol
,
bytes
)
;
pCol
->
bytes
=
bytes
;
pBuilder
->
tlen
+=
(
TYPE_BYTES
[
type
]
+
bytes
);
pBuilder
->
tlen
+=
(
TYPE_BYTES
[
type
]
+
bytes
);
pBuilder
->
vlen
+=
bytes
-
sizeof
(
VarDataLenT
);
pBuilder
->
vlen
+=
bytes
-
sizeof
(
VarDataLenT
);
}
else
{
}
else
{
colSetBytes
(
pCol
,
TYPE_BYTES
[
type
])
;
pCol
->
bytes
=
TYPE_BYTES
[
type
]
;
pBuilder
->
tlen
+=
TYPE_BYTES
[
type
];
pBuilder
->
tlen
+=
TYPE_BYTES
[
type
];
pBuilder
->
vlen
+=
TYPE_BYTES
[
type
];
pBuilder
->
vlen
+=
TYPE_BYTES
[
type
];
}
}
...
@@ -1275,151 +1137,19 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
...
@@ -1275,151 +1137,19 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
STSchema
*
pSchema
=
(
STSchema
*
)
taosMemoryMalloc
(
tlen
);
STSchema
*
pSchema
=
(
STSchema
*
)
taosMemoryMalloc
(
tlen
);
if
(
pSchema
==
NULL
)
return
NULL
;
if
(
pSchema
==
NULL
)
return
NULL
;
schemaVersion
(
pSchema
)
=
pBuilder
->
version
;
pSchema
->
version
=
pBuilder
->
version
;
schemaNCols
(
pSchema
)
=
pBuilder
->
nCols
;
pSchema
->
numOfCols
=
pBuilder
->
nCols
;
schemaTLen
(
pSchema
)
=
pBuilder
->
tlen
;
pSchema
->
tlen
=
pBuilder
->
tlen
;
schemaFLen
(
pSchema
)
=
pBuilder
->
flen
;
pSchema
->
flen
=
pBuilder
->
flen
;
schemaVLen
(
pSchema
)
=
pBuilder
->
vlen
;
pSchema
->
vlen
=
pBuilder
->
vlen
;
#ifdef TD_SUPPORT_BITMAP
#ifdef TD_SUPPORT_BITMAP
schemaTLen
(
pSchema
)
+=
(
int
)
TD_BITMAP_BYTES
(
schemaNCols
(
pSchema
)
);
pSchema
->
tlen
+=
(
int
)
TD_BITMAP_BYTES
(
pSchema
->
numOfCols
);
#endif
#endif
memcpy
(
schemaColAt
(
pSchema
,
0
)
,
pBuilder
->
columns
,
sizeof
(
STColumn
)
*
pBuilder
->
nCols
);
memcpy
(
&
pSchema
->
columns
[
0
]
,
pBuilder
->
columns
,
sizeof
(
STColumn
)
*
pBuilder
->
nCols
);
return
pSchema
;
return
pSchema
;
}
}
void
dataColInit
(
SDataCol
*
pDataCol
,
STColumn
*
pCol
,
int
maxPoints
)
{
pDataCol
->
type
=
colType
(
pCol
);
pDataCol
->
colId
=
colColId
(
pCol
);
pDataCol
->
bytes
=
colBytes
(
pCol
);
pDataCol
->
offset
=
colOffset
(
pCol
)
+
0
;
// TD_DATA_ROW_HEAD_SIZE;
pDataCol
->
len
=
0
;
}
static
FORCE_INLINE
const
void
*
tdGetColDataOfRowUnsafe
(
SDataCol
*
pCol
,
int
row
)
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
return
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
dataOff
[
row
]);
}
else
{
return
POINTER_SHIFT
(
pCol
->
pData
,
TYPE_BYTES
[
pCol
->
type
]
*
row
);
}
}
bool
isNEleNull
(
SDataCol
*
pCol
,
int
nEle
)
{
if
(
isAllRowsNull
(
pCol
))
return
true
;
for
(
int
i
=
0
;
i
<
nEle
;
++
i
)
{
if
(
!
isNull
(
tdGetColDataOfRowUnsafe
(
pCol
,
i
),
pCol
->
type
))
return
false
;
}
return
true
;
}
void
*
dataColSetOffset
(
SDataCol
*
pCol
,
int
nEle
)
{
ASSERT
(((
pCol
->
type
==
TSDB_DATA_TYPE_BINARY
)
||
(
pCol
->
type
==
TSDB_DATA_TYPE_NCHAR
)));
void
*
tptr
=
pCol
->
pData
;
// char *tptr = (char *)(pCol->pData);
VarDataOffsetT
offset
=
0
;
for
(
int
i
=
0
;
i
<
nEle
;
++
i
)
{
pCol
->
dataOff
[
i
]
=
offset
;
offset
+=
varDataTLen
(
tptr
);
tptr
=
POINTER_SHIFT
(
tptr
,
varDataTLen
(
tptr
));
}
return
POINTER_SHIFT
(
tptr
,
varDataTLen
(
tptr
));
}
SDataCols
*
tdNewDataCols
(
int
maxCols
,
int
maxRows
)
{
SDataCols
*
pCols
=
(
SDataCols
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SDataCols
));
if
(
pCols
==
NULL
)
{
uDebug
(
"malloc failure, size:%"
PRId64
" failed, reason:%s"
,
(
int64_t
)
sizeof
(
SDataCols
),
strerror
(
errno
));
return
NULL
;
}
pCols
->
maxPoints
=
maxRows
;
pCols
->
maxCols
=
maxCols
;
pCols
->
numOfRows
=
0
;
pCols
->
numOfCols
=
0
;
pCols
->
bitmapMode
=
TSDB_BITMODE_DEFAULT
;
if
(
maxCols
>
0
)
{
pCols
->
cols
=
(
SDataCol
*
)
taosMemoryCalloc
(
maxCols
,
sizeof
(
SDataCol
));
if
(
pCols
->
cols
==
NULL
)
{
uDebug
(
"malloc failure, size:%"
PRId64
" failed, reason:%s"
,
(
int64_t
)
sizeof
(
SDataCol
)
*
maxCols
,
strerror
(
errno
));
tdFreeDataCols
(
pCols
);
return
NULL
;
}
#if 0 // no need as calloc used
int i;
for (i = 0; i < maxCols; i++) {
pCols->cols[i].spaceSize = 0;
pCols->cols[i].len = 0;
pCols->cols[i].pData = NULL;
pCols->cols[i].dataOff = NULL;
}
#endif
}
return
pCols
;
}
int
tdInitDataCols
(
SDataCols
*
pCols
,
STSchema
*
pSchema
)
{
int
i
;
int
oldMaxCols
=
pCols
->
maxCols
;
if
(
schemaNCols
(
pSchema
)
>
oldMaxCols
)
{
pCols
->
maxCols
=
schemaNCols
(
pSchema
);
void
*
ptr
=
(
SDataCol
*
)
taosMemoryRealloc
(
pCols
->
cols
,
sizeof
(
SDataCol
)
*
pCols
->
maxCols
);
if
(
ptr
==
NULL
)
return
-
1
;
pCols
->
cols
=
ptr
;
for
(
i
=
oldMaxCols
;
i
<
pCols
->
maxCols
;
++
i
)
{
pCols
->
cols
[
i
].
pData
=
NULL
;
pCols
->
cols
[
i
].
dataOff
=
NULL
;
pCols
->
cols
[
i
].
pBitmap
=
NULL
;
pCols
->
cols
[
i
].
spaceSize
=
0
;
}
}
#if 0
tdResetDataCols(pCols); // redundant loop to reset len/blen to 0, already reset in following dataColInit(...)
#endif
pCols
->
numOfRows
=
0
;
pCols
->
bitmapMode
=
TSDB_BITMODE_DEFAULT
;
pCols
->
numOfCols
=
schemaNCols
(
pSchema
);
for
(
i
=
0
;
i
<
schemaNCols
(
pSchema
);
++
i
)
{
dataColInit
(
pCols
->
cols
+
i
,
schemaColAt
(
pSchema
,
i
),
pCols
->
maxPoints
);
}
return
0
;
}
SDataCols
*
tdFreeDataCols
(
SDataCols
*
pCols
)
{
int
i
;
if
(
pCols
)
{
if
(
pCols
->
cols
)
{
int
maxCols
=
pCols
->
maxCols
;
for
(
i
=
0
;
i
<
maxCols
;
++
i
)
{
SDataCol
*
pCol
=
&
pCols
->
cols
[
i
];
taosMemoryFreeClear
(
pCol
->
pData
);
}
taosMemoryFree
(
pCols
->
cols
);
pCols
->
cols
=
NULL
;
}
taosMemoryFree
(
pCols
);
}
return
NULL
;
}
void
tdResetDataCols
(
SDataCols
*
pCols
)
{
if
(
pCols
!=
NULL
)
{
pCols
->
numOfRows
=
0
;
pCols
->
bitmapMode
=
0
;
for
(
int
i
=
0
;
i
<
pCols
->
maxCols
;
++
i
)
{
dataColReset
(
pCols
->
cols
+
i
);
}
}
}
#endif
#endif
\ No newline at end of file
source/common/src/tglobal.c
浏览文件 @
e2738567
...
@@ -412,7 +412,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
...
@@ -412,7 +412,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads
=
TMAX
(
tsNumOfVnodeQueryThreads
,
2
);
tsNumOfVnodeQueryThreads
=
TMAX
(
tsNumOfVnodeQueryThreads
,
2
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeQueryThreads"
,
tsNumOfVnodeQueryThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeQueryThreads"
,
tsNumOfVnodeQueryThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeFetchThreads
=
TRANGE
(
tsNumOfVnodeFetchThreads
,
1
,
1
);
tsNumOfVnodeFetchThreads
=
tsNumOfCores
/
4
;
tsNumOfVnodeFetchThreads
=
TMAX
(
tsNumOfVnodeFetchThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeFetchThreads"
,
tsNumOfVnodeFetchThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeFetchThreads"
,
tsNumOfVnodeFetchThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeWriteThreads
=
tsNumOfCores
;
tsNumOfVnodeWriteThreads
=
tsNumOfCores
;
...
...
source/common/src/trow.c
浏览文件 @
e2738567
此差异已折叠。
点击以展开。
source/common/test/dataformatTest.cpp
浏览文件 @
e2738567
...
@@ -285,8 +285,8 @@ int32_t debugPrintSColVal(SColVal *cv, int8_t type) {
...
@@ -285,8 +285,8 @@ int32_t debugPrintSColVal(SColVal *cv, int8_t type) {
}
}
void
debugPrintTSRow
(
STSRow2
*
row
,
STSchema
*
pTSchema
,
const
char
*
tags
,
int32_t
ln
)
{
void
debugPrintTSRow
(
STSRow2
*
row
,
STSchema
*
pTSchema
,
const
char
*
tags
,
int32_t
ln
)
{
printf
(
"%s:%d %s:v%d:%d "
,
tags
,
ln
,
(
row
->
flags
&
0xf0
)
?
"KV"
:
"TP"
,
row
->
sver
,
row
->
nData
);
//
printf("%s:%d %s:v%d:%d ", tags, ln, (row->flags & 0xf0) ? "KV" : "TP", row->sver, row->nData);
for
(
int16_t
i
=
0
;
i
<
schemaNCols
(
pTSchema
)
;
++
i
)
{
for
(
int16_t
i
=
0
;
i
<
pTSchema
->
numOfCols
;
++
i
)
{
SColVal
cv
=
{
0
};
SColVal
cv
=
{
0
};
tTSRowGet
(
row
,
pTSchema
,
i
,
&
cv
);
tTSRowGet
(
row
,
pTSchema
,
i
,
&
cv
);
debugPrintSColVal
(
&
cv
,
pTSchema
->
columns
[
i
].
type
);
debugPrintSColVal
(
&
cv
,
pTSchema
->
columns
[
i
].
type
);
...
@@ -393,7 +393,7 @@ static int32_t checkSColVal(const char *rawVal, SColVal *cv, int8_t type) {
...
@@ -393,7 +393,7 @@ static int32_t checkSColVal(const char *rawVal, SColVal *cv, int8_t type) {
}
}
static
void
checkTSRow
(
const
char
**
data
,
STSRow2
*
row
,
STSchema
*
pTSchema
)
{
static
void
checkTSRow
(
const
char
**
data
,
STSRow2
*
row
,
STSchema
*
pTSchema
)
{
for
(
int16_t
i
=
0
;
i
<
schemaNCols
(
pTSchema
)
;
++
i
)
{
for
(
int16_t
i
=
0
;
i
<
pTSchema
->
numOfCols
;
++
i
)
{
SColVal
cv
=
{
0
};
SColVal
cv
=
{
0
};
tTSRowGet
(
row
,
pTSchema
,
i
,
&
cv
);
tTSRowGet
(
row
,
pTSchema
,
i
,
&
cv
);
checkSColVal
(
data
[
i
],
&
cv
,
pTSchema
->
columns
[
i
].
type
);
checkSColVal
(
data
[
i
],
&
cv
,
pTSchema
->
columns
[
i
].
type
);
...
...
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
浏览文件 @
e2738567
...
@@ -31,7 +31,7 @@ typedef struct SVnodeMgmt {
...
@@ -31,7 +31,7 @@ typedef struct SVnodeMgmt {
const
char
*
path
;
const
char
*
path
;
const
char
*
name
;
const
char
*
name
;
SQWorkerPool
queryPool
;
SQWorkerPool
queryPool
;
S
Q
WorkerPool
fetchPool
;
S
W
WorkerPool
fetchPool
;
SWWorkerPool
syncPool
;
SWWorkerPool
syncPool
;
SWWorkerPool
writePool
;
SWWorkerPool
writePool
;
SWWorkerPool
applyPool
;
SWWorkerPool
applyPool
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
浏览文件 @
e2738567
...
@@ -31,7 +31,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
...
@@ -31,7 +31,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
if
(
pVnode
&&
num
<
size
)
{
if
(
pVnode
&&
num
<
size
)
{
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
// dTrace("vgId:%d, acquire vnode
, refCount
:%d", pVnode->vgId, refCount);
// dTrace("vgId:%d, acquire vnode
list, ref
:%d", pVnode->vgId, refCount);
pVnodes
[
num
++
]
=
(
*
ppVnode
);
pVnodes
[
num
++
]
=
(
*
ppVnode
);
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
else
{
}
else
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
e2738567
...
@@ -23,6 +23,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
...
@@ -23,6 +23,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
taosHashGetDup
(
pMgmt
->
hash
,
&
vgId
,
sizeof
(
int32_t
),
(
void
*
)
&
pVnode
);
taosHashGetDup
(
pMgmt
->
hash
,
&
vgId
,
sizeof
(
int32_t
),
(
void
*
)
&
pVnode
);
if
(
pVnode
==
NULL
||
pVnode
->
dropped
)
{
if
(
pVnode
==
NULL
||
pVnode
->
dropped
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
pVnode
=
NULL
;
}
else
{
}
else
{
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
// dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
// dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
...
@@ -82,6 +83,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -82,6 +83,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
dTrace
(
"vgId:%d, wait for vnode ref become 0"
,
pVnode
->
vgId
);
while
(
pVnode
->
refCount
>
0
)
taosMsleep
(
10
);
while
(
pVnode
->
refCount
>
0
)
taosMsleep
(
10
);
dTrace
(
"vgId:%d, wait for vnode queue is empty"
,
pVnode
->
vgId
);
dTrace
(
"vgId:%d, wait for vnode queue is empty"
,
pVnode
->
vgId
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
e2738567
...
@@ -81,21 +81,26 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
...
@@ -81,21 +81,26 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem
(
pMsg
);
taosFreeQitem
(
pMsg
);
}
}
static
void
vmProcessFetchQueue
(
SQueueInfo
*
pInfo
,
S
RpcMsg
*
pMsg
)
{
static
void
vmProcessFetchQueue
(
SQueueInfo
*
pInfo
,
S
TaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
SRpcMsg
*
pMsg
=
NULL
;
dGTrace
(
"vgId:%d, msg:%p get from vnode-fetch queue"
,
pVnode
->
vgId
,
pMsg
);
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
pMsg
,
pInfo
);
if
(
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
)
==
0
)
continue
;
if
(
code
!=
0
)
{
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
if
(
terrno
!=
0
)
code
=
terrno
;
dGTrace
(
"vgId:%d, msg:%p get from vnode-fetch queue"
,
pVnode
->
vgId
,
pMsg
);
dGError
(
"vgId:%d, msg:%p failed to fetch since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
vmSendRsp
(
pMsg
,
code
);
}
dGTrace
(
"vgId:%d, msg:%p is freed, code:0x%x"
,
pVnode
->
vgId
,
pMsg
,
code
);
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
pMsg
,
pInfo
);
rpcFreeCont
(
pMsg
->
pCont
);
if
(
code
!=
0
)
{
taosFreeQitem
(
pMsg
);
if
(
terrno
!=
0
)
code
=
terrno
;
dGError
(
"vgId:%d, msg:%p failed to fetch since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
vmSendRsp
(
pMsg
,
code
);
}
dGTrace
(
"vgId:%d, msg:%p is freed, code:0x%x"
,
pVnode
->
vgId
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
}
}
static
void
vmProcessSyncQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
static
void
vmProcessSyncQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
...
@@ -201,9 +206,9 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
...
@@ -201,9 +206,9 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
int32_t
code
=
vmPutMsgToQueue
(
pMgmt
,
pMsg
,
qtype
);
int32_t
code
=
vmPutMsgToQueue
(
pMgmt
,
pMsg
,
qtype
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
dTrace
(
"msg:%p, is freed"
,
pMsg
);
dTrace
(
"msg:%p, is freed"
,
pMsg
);
taosFreeQitem
(
pMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
rpcFreeCont
(
pMsg
->
pCont
);
pRpc
->
pCont
=
NULL
;
pRpc
->
pCont
=
NULL
;
taosFreeQitem
(
pMsg
);
}
}
return
code
;
return
code
;
...
@@ -232,8 +237,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
...
@@ -232,8 +237,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
default:
default:
break
;
break
;
}
}
vmReleaseVnode
(
pMgmt
,
pVnode
);
}
}
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
size
;
return
size
;
}
}
...
@@ -242,7 +247,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -242,7 +247,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode
->
pSyncQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncPool
,
pVnode
,
(
FItems
)
vmProcessSyncQueue
);
pVnode
->
pSyncQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncPool
,
pVnode
,
(
FItems
)
vmProcessSyncQueue
);
pVnode
->
pApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
applyPool
,
pVnode
->
pImpl
,
(
FItems
)
vnodeApplyWriteMsg
);
pVnode
->
pApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
applyPool
,
pVnode
->
pImpl
,
(
FItems
)
vnodeApplyWriteMsg
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pFetchQ
=
t
QWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItem
)
vmProcessFetchQueue
);
pVnode
->
pFetchQ
=
t
WWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItems
)
vmProcessFetchQueue
);
if
(
pVnode
->
pWriteQ
==
NULL
||
pVnode
->
pSyncQ
==
NULL
||
pVnode
->
pApplyQ
==
NULL
||
pVnode
->
pQueryQ
==
NULL
||
if
(
pVnode
->
pWriteQ
==
NULL
||
pVnode
->
pSyncQ
==
NULL
||
pVnode
->
pApplyQ
==
NULL
||
pVnode
->
pQueryQ
==
NULL
||
pVnode
->
pFetchQ
==
NULL
)
{
pVnode
->
pFetchQ
==
NULL
)
{
...
@@ -250,7 +255,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -250,7 +255,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
return
-
1
;
return
-
1
;
}
}
dDebug
(
"vgId:%d, queue is alloced"
,
pVnode
->
vgId
);
dDebug
(
"vgId:%d, write-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pWriteQ
);
dDebug
(
"vgId:%d, sync-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pSyncQ
);
dDebug
(
"vgId:%d, apply-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pApplyQ
);
dDebug
(
"vgId:%d, query-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pQueryQ
);
dDebug
(
"vgId:%d, fetch-queue:%p is alloced"
,
pVnode
->
vgId
,
pVnode
->
pFetchQ
);
return
0
;
return
0
;
}
}
...
@@ -259,7 +268,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -259,7 +268,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tWWorkerFreeQueue
(
&
pMgmt
->
applyPool
,
pVnode
->
pApplyQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
applyPool
,
pVnode
->
pApplyQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
syncPool
,
pVnode
->
pSyncQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
syncPool
,
pVnode
->
pSyncQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
t
Q
WorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
t
W
WorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
pVnode
->
pWriteQ
=
NULL
;
pVnode
->
pWriteQ
=
NULL
;
pVnode
->
pSyncQ
=
NULL
;
pVnode
->
pSyncQ
=
NULL
;
pVnode
->
pApplyQ
=
NULL
;
pVnode
->
pApplyQ
=
NULL
;
...
@@ -275,11 +284,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
...
@@ -275,11 +284,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pQPool
->
max
=
tsNumOfVnodeQueryThreads
;
pQPool
->
max
=
tsNumOfVnodeQueryThreads
;
if
(
tQWorkerInit
(
pQPool
)
!=
0
)
return
-
1
;
if
(
tQWorkerInit
(
pQPool
)
!=
0
)
return
-
1
;
S
Q
WorkerPool
*
pFPool
=
&
pMgmt
->
fetchPool
;
S
W
WorkerPool
*
pFPool
=
&
pMgmt
->
fetchPool
;
pFPool
->
name
=
"vnode-fetch"
;
pFPool
->
name
=
"vnode-fetch"
;
pFPool
->
min
=
tsNumOfVnodeFetchThreads
;
pFPool
->
max
=
tsNumOfVnodeFetchThreads
;
pFPool
->
max
=
tsNumOfVnodeFetchThreads
;
if
(
t
Q
WorkerInit
(
pFPool
)
!=
0
)
return
-
1
;
if
(
t
W
WorkerInit
(
pFPool
)
!=
0
)
return
-
1
;
SWWorkerPool
*
pWPool
=
&
pMgmt
->
writePool
;
SWWorkerPool
*
pWPool
=
&
pMgmt
->
writePool
;
pWPool
->
name
=
"vnode-write"
;
pWPool
->
name
=
"vnode-write"
;
...
@@ -325,6 +333,6 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
...
@@ -325,6 +333,6 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
tWWorkerCleanup
(
&
pMgmt
->
applyPool
);
tWWorkerCleanup
(
&
pMgmt
->
applyPool
);
tWWorkerCleanup
(
&
pMgmt
->
syncPool
);
tWWorkerCleanup
(
&
pMgmt
->
syncPool
);
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
t
Q
WorkerCleanup
(
&
pMgmt
->
fetchPool
);
t
W
WorkerCleanup
(
&
pMgmt
->
fetchPool
);
dDebug
(
"vnode workers are closed"
);
dDebug
(
"vnode workers are closed"
);
}
}
source/dnode/vnode/src/tq/tq.c
浏览文件 @
e2738567
...
@@ -244,6 +244,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -244,6 +244,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal
reqOffset
=
pReq
->
reqOffset
;
STqOffsetVal
reqOffset
=
pReq
->
reqOffset
;
STqOffsetVal
fetchOffsetNew
;
STqOffsetVal
fetchOffsetNew
;
// todo
workerId
=
0
;
// 1.find handle
// 1.find handle
STqHandle
*
pHandle
=
taosHashGet
(
pTq
->
handles
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
STqHandle
*
pHandle
=
taosHashGet
(
pTq
->
handles
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
/*ASSERT(pHandle);*/
/*ASSERT(pHandle);*/
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
e2738567
...
@@ -420,8 +420,8 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
...
@@ -420,8 +420,8 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
}
}
bool
inSlidingWindow
(
SInterval
*
pInterval
,
STimeWindow
*
pWin
,
SDataBlockInfo
*
pBlockInfo
)
{
bool
inSlidingWindow
(
SInterval
*
pInterval
,
STimeWindow
*
pWin
,
SDataBlockInfo
*
pBlockInfo
)
{
if
(
pInterval
->
interval
!=
pInterval
->
sliding
&&
(
pWin
->
ekey
<
pBlockInfo
->
calWin
.
skey
||
if
(
pInterval
->
interval
!=
pInterval
->
sliding
&&
pWin
->
skey
>
pBlockInfo
->
calWin
.
ekey
)
)
{
(
pWin
->
ekey
<
pBlockInfo
->
calWin
.
skey
||
pWin
->
skey
>
pBlockInfo
->
calWin
.
ekey
)
)
{
return
false
;
return
false
;
}
}
return
true
;
return
true
;
...
@@ -813,7 +813,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
...
@@ -813,7 +813,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
order
);
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
order
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
...
@@ -846,7 +847,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
...
@@ -846,7 +847,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation
(
pInfo
,
pBlock
,
pResult
,
&
win
,
startPos
,
forwardRows
,
pSup
);
doWindowBorderInterpolation
(
pInfo
,
pBlock
,
pResult
,
&
win
,
startPos
,
forwardRows
,
pSup
);
}
}
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
pInfo
->
order
);
pBlock
->
info
.
rows
,
numOfOutput
,
pInfo
->
order
);
...
@@ -928,7 +930,7 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo
...
@@ -928,7 +930,7 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
if
(
tsCols
!=
NULL
)
{
if
(
tsCols
!=
NULL
)
{
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
}
}
}
}
...
@@ -1290,15 +1292,16 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
...
@@ -1290,15 +1292,16 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
SColumnInfoData
*
pGpCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pGpCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
pGpDatas
=
(
uint64_t
*
)
pGpCol
->
pData
;
pGpDatas
=
(
uint64_t
*
)
pGpCol
->
pData
;
}
}
int32_t
step
=
0
;
int32_t
step
=
0
;
int32_t
startPos
=
0
;
int32_t
startPos
=
0
;
SResultRowInfo
dumyInfo
;
SResultRowInfo
dumyInfo
;
dumyInfo
.
cur
.
pageId
=
-
1
;
dumyInfo
.
cur
.
pageId
=
-
1
;
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsCols
[
0
],
pInterval
,
TSDB_ORDER_ASC
);
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsCols
[
0
],
pInterval
,
TSDB_ORDER_ASC
);
while
(
1
)
{
while
(
1
)
{
step
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
win
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
step
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
win
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
uint64_t
winGpId
=
pGpDatas
?
pGpDatas
[
startPos
]
:
pBlock
->
info
.
groupId
;
uint64_t
winGpId
=
pGpDatas
?
pGpDatas
[
startPos
]
:
pBlock
->
info
.
groupId
;
bool
res
=
doClearWindow
(
pAggSup
,
pSup1
,
(
char
*
)
&
win
.
skey
,
sizeof
(
TKEY
),
winGpId
,
numOfOutput
);
bool
res
=
doClearWindow
(
pAggSup
,
pSup1
,
(
char
*
)
&
win
.
skey
,
sizeof
(
T
S
KEY
),
winGpId
,
numOfOutput
);
if
(
pUpWins
&&
res
)
{
if
(
pUpWins
&&
res
)
{
SWinRes
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
winGpId
};
SWinRes
winRes
=
{.
ts
=
win
.
skey
,
.
groupId
=
winGpId
};
taosArrayPush
(
pUpWins
,
&
winRes
);
taosArrayPush
(
pUpWins
,
&
winRes
);
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
e2738567
...
@@ -1144,9 +1144,9 @@ static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBl
...
@@ -1144,9 +1144,9 @@ static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBl
static
int32_t
findRowIndex
(
int32_t
start
,
int32_t
num
,
SColumnInfoData
*
pCol
,
const
char
*
tval
)
{
static
int32_t
findRowIndex
(
int32_t
start
,
int32_t
num
,
SColumnInfoData
*
pCol
,
const
char
*
tval
)
{
// the data is loaded, not only the block SMA value
// the data is loaded, not only the block SMA value
for
(
int32_t
i
=
start
;
i
<
num
+
start
;
++
i
)
{
for
(
int32_t
i
=
start
;
i
<
num
+
start
;
++
i
)
{
char
*
p
=
colDataGetData
(
pCol
,
i
);
char
*
p
=
colDataGetData
(
pCol
,
i
);
if
(
memcpy
((
void
*
)
tval
,
p
,
pCol
->
info
.
bytes
)
==
0
)
{
if
(
memcpy
((
void
*
)
tval
,
p
,
pCol
->
info
.
bytes
)
==
0
)
{
return
i
;
return
i
;
}
}
}
}
...
@@ -1154,7 +1154,6 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c
...
@@ -1154,7 +1154,6 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c
ASSERT
(
0
);
ASSERT
(
0
);
}
}
int32_t
doMinMaxHelper
(
SqlFunctionCtx
*
pCtx
,
int32_t
isMinFunc
)
{
int32_t
doMinMaxHelper
(
SqlFunctionCtx
*
pCtx
,
int32_t
isMinFunc
)
{
int32_t
numOfElems
=
0
;
int32_t
numOfElems
=
0
;
...
@@ -1938,7 +1937,7 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
...
@@ -1938,7 +1937,7 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
SStddevRes
*
pInfo
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
SStddevRes
*
pInfo
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
startRowIndex
+
pInput
->
numOfRows
;
++
i
)
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
startRowIndex
+
pInput
->
numOfRows
;
++
i
)
{
char
*
data
=
colDataGetData
(
pCol
,
i
);
char
*
data
=
colDataGetData
(
pCol
,
i
);
SStddevRes
*
pInputInfo
=
(
SStddevRes
*
)
varDataVal
(
data
);
SStddevRes
*
pInputInfo
=
(
SStddevRes
*
)
varDataVal
(
data
);
stddevTransferInfo
(
pInputInfo
,
pInfo
);
stddevTransferInfo
(
pInputInfo
,
pInfo
);
...
@@ -3512,8 +3511,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
...
@@ -3512,8 +3511,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
setBufPageDirty
(
pPage
,
true
);
setBufPageDirty
(
pPage
,
true
);
releaseBufPage
(
pCtx
->
pBuf
,
pPage
);
releaseBufPage
(
pCtx
->
pBuf
,
pPage
);
#ifdef BUF_PAGE_DEBUG
#ifdef BUF_PAGE_DEBUG
qDebug
(
"page_saveTuple pos:%p,pageId:%d, offset:%d
\n
"
,
pPos
,
pPos
->
pageId
,
qDebug
(
"page_saveTuple pos:%p,pageId:%d, offset:%d
\n
"
,
pPos
,
pPos
->
pageId
,
pPos
->
offset
);
pPos
->
offset
);
#endif
#endif
}
}
...
@@ -3814,7 +3812,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo
...
@@ -3814,7 +3812,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo
SElapsedInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
SElapsedInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
pInfo
->
result
=
0
;
pInfo
->
result
=
0
;
pInfo
->
min
=
MAX_TS_KEY
;
pInfo
->
min
=
TSKEY_MAX
;
pInfo
->
max
=
0
;
pInfo
->
max
=
0
;
if
(
pCtx
->
numOfParams
>
1
)
{
if
(
pCtx
->
numOfParams
>
1
)
{
...
@@ -3841,7 +3839,7 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
...
@@ -3841,7 +3839,7 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
}
}
if
(
pInput
->
colDataAggIsSet
)
{
if
(
pInput
->
colDataAggIsSet
)
{
if
(
pInfo
->
min
==
MAX_TS_KEY
)
{
if
(
pInfo
->
min
==
TSKEY_MAX
)
{
pInfo
->
min
=
GET_INT64_VAL
(
&
pAgg
->
min
);
pInfo
->
min
=
GET_INT64_VAL
(
&
pAgg
->
min
);
pInfo
->
max
=
GET_INT64_VAL
(
&
pAgg
->
max
);
pInfo
->
max
=
GET_INT64_VAL
(
&
pAgg
->
max
);
}
else
{
}
else
{
...
...
source/libs/sync/src/syncIO.c
浏览文件 @
e2738567
...
@@ -242,13 +242,13 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
...
@@ -242,13 +242,13 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
}
static
void
*
syncIOConsumerFunc
(
void
*
param
)
{
static
void
*
syncIOConsumerFunc
(
void
*
param
)
{
SSyncIO
*
io
=
param
;
SSyncIO
*
io
=
param
;
STaosQall
*
qall
;
STaosQall
*
qall
=
taosAllocateQall
()
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
qall
=
taosAllocateQall
()
;
SQueueInfo
qinfo
=
{
0
}
;
while
(
1
)
{
while
(
1
)
{
int
numOfMsgs
=
taosReadAllQitemsFromQset
(
io
->
pQset
,
qall
,
NULL
,
NULL
);
int
numOfMsgs
=
taosReadAllQitemsFromQset
(
io
->
pQset
,
qall
,
&
qinfo
);
sTrace
(
"syncIOConsumerFunc %d msgs are received"
,
numOfMsgs
);
sTrace
(
"syncIOConsumerFunc %d msgs are received"
,
numOfMsgs
);
if
(
numOfMsgs
<=
0
)
{
if
(
numOfMsgs
<=
0
)
{
break
;
break
;
...
@@ -369,6 +369,8 @@ static void *syncIOConsumerFunc(void *param) {
...
@@ -369,6 +369,8 @@ static void *syncIOConsumerFunc(void *param) {
taosFreeQitem
(
pRpcMsg
);
taosFreeQitem
(
pRpcMsg
);
}
}
taosUpdateItemSize
(
qinfo
.
queue
,
numOfMsgs
);
}
}
taosFreeQall
(
qall
);
taosFreeQall
(
qall
);
...
...
source/libs/transport/test/pushServer.c
浏览文件 @
e2738567
...
@@ -31,12 +31,12 @@ void processShellMsg() {
...
@@ -31,12 +31,12 @@ void processShellMsg() {
STaosQall
*
qall
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
int
type
;
int
type
;
void
*
pvnode
;
SQueueInfo
qinfo
=
{
0
}
;
qall
=
taosAllocateQall
();
qall
=
taosAllocateQall
();
while
(
1
)
{
while
(
1
)
{
int
numOfMsgs
=
taosReadAllQitemsFromQset
(
qset
,
qall
,
&
pvnode
,
NULL
);
int
numOfMsgs
=
taosReadAllQitemsFromQset
(
qset
,
qall
,
&
qinfo
);
tDebug
(
"%d shell msgs are received"
,
numOfMsgs
);
tDebug
(
"%d shell msgs are received"
,
numOfMsgs
);
if
(
numOfMsgs
<=
0
)
break
;
if
(
numOfMsgs
<=
0
)
break
;
...
@@ -86,6 +86,8 @@ void processShellMsg() {
...
@@ -86,6 +86,8 @@ void processShellMsg() {
rpcSendResponse
(
&
nRpcMsg
);
rpcSendResponse
(
&
nRpcMsg
);
}
}
}
}
taosUpdateItemSize
(
qinfo
.
queue
,
numOfMsgs
);
}
}
taosFreeQall
(
qall
);
taosFreeQall
(
qall
);
...
...
source/util/src/tqueue.c
浏览文件 @
e2738567
...
@@ -115,7 +115,7 @@ bool taosQueueEmpty(STaosQueue *queue) {
...
@@ -115,7 +115,7 @@ bool taosQueueEmpty(STaosQueue *queue) {
bool
empty
=
false
;
bool
empty
=
false
;
taosThreadMutexLock
(
&
queue
->
mutex
);
taosThreadMutexLock
(
&
queue
->
mutex
);
if
(
queue
->
head
==
NULL
&&
queue
->
tail
==
NULL
)
{
if
(
queue
->
head
==
NULL
&&
queue
->
tail
==
NULL
&&
queue
->
numOfItems
==
0
&&
queue
->
memOfItems
==
0
)
{
empty
=
true
;
empty
=
true
;
}
}
taosThreadMutexUnlock
(
&
queue
->
mutex
);
taosThreadMutexUnlock
(
&
queue
->
mutex
);
...
@@ -123,6 +123,14 @@ bool taosQueueEmpty(STaosQueue *queue) {
...
@@ -123,6 +123,14 @@ bool taosQueueEmpty(STaosQueue *queue) {
return
empty
;
return
empty
;
}
}
void
taosUpdateItemSize
(
STaosQueue
*
queue
,
int32_t
items
)
{
if
(
queue
==
NULL
)
return
;
taosThreadMutexLock
(
&
queue
->
mutex
);
queue
->
numOfItems
-=
items
;
taosThreadMutexUnlock
(
&
queue
->
mutex
);
}
int32_t
taosQueueItemSize
(
STaosQueue
*
queue
)
{
int32_t
taosQueueItemSize
(
STaosQueue
*
queue
)
{
if
(
queue
==
NULL
)
return
0
;
if
(
queue
==
NULL
)
return
0
;
...
@@ -257,6 +265,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
...
@@ -257,6 +265,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
queue
->
tail
=
NULL
;
queue
->
tail
=
NULL
;
queue
->
numOfItems
=
0
;
queue
->
numOfItems
=
0
;
queue
->
memOfItems
=
0
;
queue
->
memOfItems
=
0
;
uTrace
(
"read %d items from queue:%p, items:%d mem:%"
PRId64
,
code
,
queue
,
queue
->
numOfItems
,
queue
->
memOfItems
);
if
(
queue
->
qset
)
atomic_sub_fetch_32
(
&
queue
->
qset
->
numOfItems
,
qall
->
numOfItems
);
if
(
queue
->
qset
)
atomic_sub_fetch_32
(
&
queue
->
qset
->
numOfItems
,
qall
->
numOfItems
);
}
}
...
@@ -397,7 +406,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
...
@@ -397,7 +406,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
int32_t
taosGetQueueNumber
(
STaosQset
*
qset
)
{
return
qset
->
numOfQueues
;
}
int32_t
taosGetQueueNumber
(
STaosQset
*
qset
)
{
return
qset
->
numOfQueues
;
}
int32_t
taosReadQitemFromQset
(
STaosQset
*
qset
,
void
**
ppItem
,
int64_t
*
ts
,
void
**
ahandle
,
FItem
*
itemFp
)
{
int32_t
taosReadQitemFromQset
(
STaosQset
*
qset
,
void
**
ppItem
,
SQueueInfo
*
qinfo
)
{
STaosQnode
*
pNode
=
NULL
;
STaosQnode
*
pNode
=
NULL
;
int32_t
code
=
0
;
int32_t
code
=
0
;
...
@@ -417,17 +426,18 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void
...
@@ -417,17 +426,18 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void
if
(
queue
->
head
)
{
if
(
queue
->
head
)
{
pNode
=
queue
->
head
;
pNode
=
queue
->
head
;
*
ppItem
=
pNode
->
item
;
*
ppItem
=
pNode
->
item
;
if
(
ahandle
)
*
ahandle
=
queue
->
ahandle
;
qinfo
->
ahandle
=
queue
->
ahandle
;
if
(
itemFp
)
*
itemFp
=
queue
->
itemFp
;
qinfo
->
fp
=
queue
->
itemFp
;
if
(
ts
)
*
ts
=
pNode
->
timestamp
;
qinfo
->
queue
=
queue
;
qinfo
->
timestamp
=
pNode
->
timestamp
;
queue
->
head
=
pNode
->
next
;
queue
->
head
=
pNode
->
next
;
if
(
queue
->
head
==
NULL
)
queue
->
tail
=
NULL
;
if
(
queue
->
head
==
NULL
)
queue
->
tail
=
NULL
;
queue
->
numOfItems
--
;
//
queue->numOfItems--;
queue
->
memOfItems
-=
pNode
->
size
;
queue
->
memOfItems
-=
pNode
->
size
;
atomic_sub_fetch_32
(
&
qset
->
numOfItems
,
1
);
atomic_sub_fetch_32
(
&
qset
->
numOfItems
,
1
);
code
=
1
;
code
=
1
;
uTrace
(
"item:%p is read out from queue:%p, items:%d mem:%"
PRId64
,
*
ppItem
,
queue
,
queue
->
numOfItems
,
uTrace
(
"item:%p is read out from queue:%p, items:%d mem:%"
PRId64
,
*
ppItem
,
queue
,
queue
->
numOfItems
-
1
,
queue
->
memOfItems
);
queue
->
memOfItems
);
}
}
...
@@ -440,7 +450,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void
...
@@ -440,7 +450,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void
return
code
;
return
code
;
}
}
int32_t
taosReadAllQitemsFromQset
(
STaosQset
*
qset
,
STaosQall
*
qall
,
void
**
ahandle
,
FItems
*
itemsFp
)
{
int32_t
taosReadAllQitemsFromQset
(
STaosQset
*
qset
,
STaosQall
*
qall
,
SQueueInfo
*
qinfo
)
{
STaosQueue
*
queue
;
STaosQueue
*
queue
;
int32_t
code
=
0
;
int32_t
code
=
0
;
...
@@ -461,13 +471,16 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
...
@@ -461,13 +471,16 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
qall
->
start
=
queue
->
head
;
qall
->
start
=
queue
->
head
;
qall
->
numOfItems
=
queue
->
numOfItems
;
qall
->
numOfItems
=
queue
->
numOfItems
;
code
=
qall
->
numOfItems
;
code
=
qall
->
numOfItems
;
if
(
ahandle
)
*
ahandle
=
queue
->
ahandle
;
qinfo
->
ahandle
=
queue
->
ahandle
;
if
(
itemsFp
)
*
itemsFp
=
queue
->
itemsFp
;
qinfo
->
fp
=
queue
->
itemsFp
;
qinfo
->
queue
=
queue
;
queue
->
head
=
NULL
;
queue
->
head
=
NULL
;
queue
->
tail
=
NULL
;
queue
->
tail
=
NULL
;
queue
->
numOfItems
=
0
;
//
queue->numOfItems = 0;
queue
->
memOfItems
=
0
;
queue
->
memOfItems
=
0
;
uTrace
(
"read %d items from queue:%p, items:0 mem:%"
PRId64
,
code
,
queue
,
queue
->
memOfItems
);
atomic_sub_fetch_32
(
&
qset
->
numOfItems
,
qall
->
numOfItems
);
atomic_sub_fetch_32
(
&
qset
->
numOfItems
,
qall
->
numOfItems
);
for
(
int32_t
j
=
1
;
j
<
qall
->
numOfItems
;
++
j
)
{
for
(
int32_t
j
=
1
;
j
<
qall
->
numOfItems
;
++
j
)
{
tsem_wait
(
&
qset
->
sem
);
tsem_wait
(
&
qset
->
sem
);
...
...
source/util/src/tworker.c
浏览文件 @
e2738567
...
@@ -70,27 +70,27 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
...
@@ -70,27 +70,27 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
static
void
*
tQWorkerThreadFp
(
SQWorker
*
worker
)
{
static
void
*
tQWorkerThreadFp
(
SQWorker
*
worker
)
{
SQWorkerPool
*
pool
=
worker
->
pool
;
SQWorkerPool
*
pool
=
worker
->
pool
;
FItem
fp
=
NULL
;
SQueueInfo
qinfo
=
{
0
};
void
*
msg
=
NULL
;
void
*
msg
=
NULL
;
int32_t
code
=
0
;
void
*
ahandle
=
NULL
;
int32_t
code
=
0
;
int64_t
ts
=
0
;
taosBlockSIGPIPE
();
taosBlockSIGPIPE
();
setThreadName
(
pool
->
name
);
setThreadName
(
pool
->
name
);
uDebug
(
"worker:%s:%d is running"
,
pool
->
name
,
worker
->
id
);
uDebug
(
"worker:%s:%d is running"
,
pool
->
name
,
worker
->
id
);
while
(
1
)
{
while
(
1
)
{
if
(
taosReadQitemFromQset
(
pool
->
qset
,
(
void
**
)
&
msg
,
&
ts
,
&
ahandle
,
&
fp
)
==
0
)
{
if
(
taosReadQitemFromQset
(
pool
->
qset
,
(
void
**
)
&
msg
,
&
qinfo
)
==
0
)
{
uDebug
(
"worker:%s:%d qset:%p, got no message and exiting"
,
pool
->
name
,
worker
->
id
,
pool
->
qset
);
uDebug
(
"worker:%s:%d qset:%p, got no message and exiting"
,
pool
->
name
,
worker
->
id
,
pool
->
qset
);
break
;
break
;
}
}
if
(
fp
!=
NULL
)
{
if
(
qinfo
.
fp
!=
NULL
)
{
SQueueInfo
info
=
{.
ahandle
=
ahandle
,
.
workerId
=
worker
->
id
,
.
threadNum
=
pool
->
num
,
.
timestamp
=
ts
};
qinfo
.
workerId
=
worker
->
id
;
(
*
fp
)(
&
info
,
msg
);
qinfo
.
threadNum
=
pool
->
num
;
(
*
((
FItem
)
qinfo
.
fp
))(
&
qinfo
,
msg
);
}
}
taosUpdateItemSize
(
qinfo
.
queue
,
1
);
}
}
return
NULL
;
return
NULL
;
...
@@ -195,28 +195,28 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
...
@@ -195,28 +195,28 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
static
void
*
tWWorkerThreadFp
(
SWWorker
*
worker
)
{
static
void
*
tWWorkerThreadFp
(
SWWorker
*
worker
)
{
SWWorkerPool
*
pool
=
worker
->
pool
;
SWWorkerPool
*
pool
=
worker
->
pool
;
FItems
fp
=
NULL
;
SQueueInfo
qinfo
=
{
0
};
void
*
msg
=
NULL
;
void
*
msg
=
NULL
;
int32_t
code
=
0
;
void
*
ahandle
=
NULL
;
int32_t
numOfMsgs
=
0
;
int32_t
numOfMsgs
=
0
;
int32_t
qtype
=
0
;
taosBlockSIGPIPE
();
taosBlockSIGPIPE
();
setThreadName
(
pool
->
name
);
setThreadName
(
pool
->
name
);
uDebug
(
"worker:%s:%d is running"
,
pool
->
name
,
worker
->
id
);
uDebug
(
"worker:%s:%d is running"
,
pool
->
name
,
worker
->
id
);
while
(
1
)
{
while
(
1
)
{
numOfMsgs
=
taosReadAllQitemsFromQset
(
worker
->
qset
,
worker
->
qall
,
&
ahandle
,
&
fp
);
numOfMsgs
=
taosReadAllQitemsFromQset
(
worker
->
qset
,
worker
->
qall
,
&
qinfo
);
if
(
numOfMsgs
==
0
)
{
if
(
numOfMsgs
==
0
)
{
uDebug
(
"worker:%s:%d qset:%p, got no message and exiting"
,
pool
->
name
,
worker
->
id
,
worker
->
qset
);
uDebug
(
"worker:%s:%d qset:%p, got no message and exiting"
,
pool
->
name
,
worker
->
id
,
worker
->
qset
);
break
;
break
;
}
}
if
(
fp
!=
NULL
)
{
if
(
qinfo
.
fp
!=
NULL
)
{
SQueueInfo
info
=
{.
ahandle
=
ahandle
,
.
workerId
=
worker
->
id
,
.
threadNum
=
pool
->
num
};
qinfo
.
workerId
=
worker
->
id
;
(
*
fp
)(
&
info
,
worker
->
qall
,
numOfMsgs
);
qinfo
.
threadNum
=
pool
->
num
;
(
*
((
FItems
)
qinfo
.
fp
))(
&
qinfo
,
worker
->
qall
,
numOfMsgs
);
}
}
taosUpdateItemSize
(
qinfo
.
queue
,
numOfMsgs
);
}
}
return
NULL
;
return
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录