提交 584ce8a9 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into enh/3.0_planner_optimize

......@@ -205,13 +205,13 @@ Additional functions are defined in `taosudf.h` to make it easier to work with t
To use your user-defined function in TDengine, first compile it to a dynamically linked library (DLL).
For example, the sample UDF `add_one.c` can be compiled into a DLL as follows:
For example, the sample UDF `bit_and.c` can be compiled into a DLL as follows:
```bash
gcc -g -O0 -fPIC -shared add_one.c -o add_one.so
gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so
```
The generated DLL file `add_one.so` can now be used to implement your function. Note: GCC 7.5 or later is required.
The generated DLL file `libbitand.so` can now be used to implement your function. Note: GCC 7.5 or later is required.
## Manage and Use User-Defined Functions
After compiling your function into a DLL, you add it to TDengine. For more information, see [User-Defined Functions](../12-taos-sql/26-udf.md).
......
......@@ -62,7 +62,7 @@ SHOW FUNCTIONS;
The function name specified when creating UDF can be used directly in SQL statements, just like builtin functions. For example:
```sql
SELECT X(c1,c2) FROM table/stable;
SELECT bit_and(c1,c2) FROM table;
```
The above SQL statement invokes function X for column c1 and c2. You can use query keywords like WHERE with user-defined functions.
The above SQL statement invokes function X for column c1 and c2 on table. You can use query keywords like WHERE with user-defined functions.
......@@ -10,4 +10,4 @@ chrono = "0.4"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
taos = { version = "0.*" }
taos = { version = "0.4.8" }
......@@ -12,7 +12,10 @@ async fn main() -> anyhow::Result<()> {
// bind table name and tags
stmt.set_tbname_tags(
"d1001",
&[Value::VarChar("California.SanFransico".into()), Value::Int(2)],
&[
Value::VarChar("California.SanFransico".into()),
Value::Int(2),
],
)?;
// bind values.
let values = vec![
......@@ -30,9 +33,9 @@ async fn main() -> anyhow::Result<()> {
ColumnView::from_floats(vec![0.33]),
];
stmt.bind(&values2)?;
stmt.add_batch()?;
// execute.
let rows = stmt.execute()?;
assert_eq!(rows, 2);
......
......@@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
format!("CREATE TOPIC tmq_meters AS SELECT * FROM `meters`")
])
.await?;
......@@ -64,13 +64,9 @@ async fn main() -> anyhow::Result<()> {
let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?;
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
consumer
.stream()
.try_for_each(|(offset, message)| async {
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
......@@ -78,20 +74,14 @@ async fn main() -> anyhow::Result<()> {
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
println!("** read {} records: {:#?}\n", records.len(), records);
}
}
consumer.commit(offset).await?;
}
}
Ok(())
})
.await?;
consumer.unsubscribe().await;
......
......@@ -5,7 +5,6 @@ async fn main() -> anyhow::Result<()> {
let dsn = "ws://";
let taos = TaosBuilder::from_dsn(dsn)?.build()?;
taos.exec_many([
"DROP DATABASE IF EXISTS power",
"CREATE DATABASE power",
......
......@@ -18,7 +18,18 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```
### 加入 TDengine 官方社区
## 学习 TDengine 知识地图
TDengine 知识地图中涵盖了 TDengine 的各种知识点,揭示了各概念实体之间的调用关系和数据流向。学习和了解 TDengine 知识地图有助于你快速掌握 TDengine 的知识体系。
<figure>
<center>
<a href="/img/tdengine-map.svg" target="_blank"><img src="/img/tdengine-map.svg" width="80%" /></a>
<figcaption>图 1. TDengine 知识地图</figcaption>
</center>
</figure>
## 加入 TDengine 官方社区
微信扫描以下二维码,学习了解 TDengine 的最新技术,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。
......
......@@ -205,13 +205,13 @@ typedef struct SUdfInterBuf {
用户定义函数的 C 语言源代码无法直接被 TDengine 系统使用,而是需要先编译为 动态链接库,之后才能载入 TDengine 系统。
例如,按照上一章节描述的规则准备好了用户定义函数的源代码 add_one.c,以 Linux 为例可以执行如下指令编译得到动态链接库文件:
例如,按照上一章节描述的规则准备好了用户定义函数的源代码 bit_and.c,以 Linux 为例可以执行如下指令编译得到动态链接库文件:
```bash
gcc -g -O0 -fPIC -shared add_one.c -o add_one.so
gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so
```
这样就准备好了动态链接库 add_one.so 文件,可以供后文创建 UDF 时使用了。为了保证可靠的系统运行,编译器 GCC 推荐使用 7.5 及以上版本。
这样就准备好了动态链接库 libbitand.so 文件,可以供后文创建 UDF 时使用了。为了保证可靠的系统运行,编译器 GCC 推荐使用 7.5 及以上版本。
## 管理和使用UDF
编译好的UDF,还需要将其加入到系统才能被正常的SQL调用。关于如何管理和使用UDF,参见[UDF使用说明](../12-taos-sql/26-udf.md)
......
......@@ -63,7 +63,7 @@ SHOW FUNCTIONS;
在 SQL 指令中,可以直接以在系统中创建 UDF 时赋予的函数名来调用用户定义函数。例如:
```sql
SELECT X(c1,c2) FROM table/stable;
SELECT bit_and(c1,c2) FROM table;
```
表示对名为 c1, c2 的数据列调用名为 X 的用户定义函数。SQL 指令中用户定义函数可以配合 WHERE 等查询特性来使用。
表示对表 table 上名为 c1, c2 的数据列调用名为 bit_and 的用户定义函数。SQL 指令中用户定义函数可以配合 WHERE 等查询特性来使用。
......@@ -27,16 +27,21 @@ typedef struct SStreamTask SStreamTask;
typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
// incremental state storage
typedef struct {
typedef struct STdbState {
SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TTB* pFuncStateDb;
TTB* pFillStateDb; // todo refactor
TTB* pSessionStateDb;
TTB* pParNameDb;
TXN txn;
int32_t number;
} STdbState;
// incremental state storage
typedef struct {
STdbState* pTdbState;
int32_t number;
} SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
......@@ -44,6 +49,7 @@ void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
int32_t streamStateAbort(SStreamState* pState);
void streamStateDestroy(SStreamState* pState);
typedef struct {
TBC* pCur;
......@@ -99,6 +105,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
#if 0
char* streamStateSessionDump(SStreamState* pState);
#endif
......
......@@ -352,6 +352,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x061A)
#define TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061C)
#define TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE TAOS_DEF_ERROR_CODE(0, 0x061D)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
......
......@@ -63,7 +63,7 @@ Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNor
[run]
Filename: {sys}\sc.exe; Parameters: "create taosd start= DEMAND binPath= ""C:\\TDengine\\taosd.exe --win_service""" ; Flags: runhidden
Filename: {sys}\sc.exe; Parameters: "create taosadapter start= DEMAND binPath= ""C:\\TDengine\\taosadapter.exe --win_service""" ; Flags: runhidden
Filename: {sys}\sc.exe; Parameters: "create taosadapter start= DEMAND binPath= ""C:\\TDengine\\taosadapter.exe""" ; Flags: runhidden
[UninstallRun]
RunOnceId: "stoptaosd"; Filename: {sys}\sc.exe; Parameters: "stop taosd" ; Flags: runhidden
......
......@@ -56,6 +56,9 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code);
} else {
rpcFreeCont(pMsg->info.rsp);
pMsg->info.rsp = NULL;
}
if (code == TSDB_CODE_RPC_REDIRECT) {
......
......@@ -577,9 +577,9 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen);
pMsg->info.hasEpSet = 1;
if (pMsg->info.rsp != NULL) {
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
pMsg->info.hasEpSet = 1;
pMsg->info.rspLen = contLen;
terrno = TSDB_CODE_RPC_REDIRECT;
} else {
......
......@@ -403,6 +403,11 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe
// validate req
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
if (pReq->type == TSDB_CHILD_TABLE && pReq->ctb.suid != mr.me.ctbEntry.suid) {
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
metaReaderClear(&mr);
return -1;
}
pReq->uid = mr.me.uid;
if (pReq->type == TSDB_CHILD_TABLE) {
pReq->ctb.suid = mr.me.ctbEntry.suid;
......
......@@ -398,6 +398,35 @@ bool tqNextDataBlock(STqReader* pReader) {
return false;
}
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
int32_t code;
int32_t cnt = 0;
for (int32_t i = 0; i < pSrc->nCols; i++) {
cnt += mask[i];
}
pDst->nCols = cnt;
pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
if (pDst->pSchema == NULL) {
return -1;
}
int32_t j = 0;
for (int32_t i = 0; i < pSrc->nCols; i++) {
if (mask[i]) {
pDst->pSchema[j++] = pSrc->pSchema[i];
SColumnInfoData colInfo =
createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != 0) {
return -1;
}
}
}
return 0;
}
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
......@@ -527,6 +556,119 @@ FAIL:
return -1;
}
int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* schemas) {
int32_t sversion = htonl(pReader->pBlock->sversion);
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
pReader->cachedSchemaSuid != pReader->msgIter.suid) {
if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
if (pReader->pSchema == NULL) {
tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
/*ASSERT(0);*/
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->msgIter.uid, pReader->cachedSchemaVer);
/*ASSERT(0);*/
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
pReader->cachedSchemaVer = sversion;
pReader->cachedSchemaSuid = pReader->msgIter.suid;
}
STSchema* pTschema = pReader->pSchema;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
int32_t colAtMost = pSchemaWrapper->nCols;
int32_t curRow = 0;
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
if (assigned) return -1;
tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
STSRowIter iter = {0};
tdSTSRowIterInit(&iter, pTschema);
STSRow* row;
while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
bool buildNew = false;
tdSTSRowIterReset(&iter, row);
for (int32_t i = 0; i < colAtMost; i++) {
SCellVal sVal = {0};
if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) {
break;
}
if (curRow == 0) {
assigned[i] = sVal.valType != TD_VTYPE_NONE;
buildNew = true;
} else {
bool currentRowAssigned = sVal.valType != TD_VTYPE_NONE;
if (currentRowAssigned != assigned[i]) {
assigned[i] = currentRowAssigned;
buildNew = true;
}
}
}
if (buildNew) {
SSDataBlock block;
SSchemaWrapper sw;
if (tqMaskBlock(&sw, &block, pSchemaWrapper, assigned) < 0) {
goto FAIL;
}
taosArrayPush(blocks, &block);
taosArrayPush(schemas, &sw);
}
SSDataBlock* pBlock = taosArrayGetLast(blocks);
pBlock->info.uid = pReader->msgIter.uid;
pBlock->info.rows = pReader->msgIter.numOfRows;
pBlock->info.version = pReader->pMsg->version;
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
tdSTSRowIterInit(&iter, pTschema);
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SCellVal sVal = {0};
if (!tdSTSRowIterFetch(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break;
}
ASSERT(sVal.valType != TD_VTYPE_NONE);
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
goto FAIL;
}
}
curRow++;
}
taosMemoryFree(assigned);
return 0;
FAIL:
taosMemoryFree(assigned);
return -1;
}
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
......
......@@ -1698,7 +1698,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
size += pBlockCol->szBitmap;
// offset
if (IS_VAR_DATA_TYPE(pColData->type)) {
if (IS_VAR_DATA_TYPE(pColData->type) && pColData->flag != (HAS_NULL | HAS_NONE)) {
code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg,
ppOut, nOut + size, &pBlockCol->szOffset, ppBuf);
if (code) goto _exit;
......
......@@ -163,7 +163,7 @@ typedef struct {
SArray* pStopInfo;
} STaskStopInfo;
typedef struct SExecTaskInfo {
struct SExecTaskInfo {
STaskIdInfo id;
uint32_t status;
STimeWindow window;
......@@ -182,7 +182,7 @@ typedef struct SExecTaskInfo {
struct SOperatorInfo* pRoot;
SLocalFetch localFetch;
STaskStopInfo stopInfo;
} SExecTaskInfo;
};
enum {
OP_NOT_OPENED = 0x0,
......@@ -315,37 +315,39 @@ typedef struct STableMetaCacheInfo {
uint64_t cacheHit;
} STableMetaCacheInfo;
typedef struct STableScanInfo {
typedef struct STableScanBase {
STsdbReader* dataReader;
SReadHandle readHandle;
SLimitInfo limitInfo;
SFileBlockLoadRecorder readRecorder;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SQueryTableDataCond cond;
SAggOptrPushDownInfo pdInfo;
SColMatchInfo matchInfo;
SReadHandle readHandle;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
STableMetaCacheInfo metaCache;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SLimitInfo limitInfo;
} STableScanBase;
typedef struct STableScanInfo {
STableScanBase base;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
SAggOptrPushDownInfo pdInfo;
int8_t assignBlockUid;
STableMetaCacheInfo metaCache;
} STableScanInfo;
typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo;
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond
STsdbReader* pReader;
SReadHandle readHandle;
STableScanBase base;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
......@@ -354,27 +356,12 @@ typedef struct STableMergeScanInfo {
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SColMatchInfo matchInfo;
int32_t numOfOutput;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval interval;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;
typedef struct STagScanInfo {
......@@ -387,17 +374,17 @@ typedef struct STagScanInfo {
} STagScanInfo;
typedef struct SLastrowScanInfo {
SSDataBlock* pRes;
SReadHandle readHandle;
void* pLastrowReader;
SColMatchInfo matchInfo;
int32_t* pSlotIds;
SExprSupp pseudoExprSup;
int32_t retrieveType;
int32_t currentGroupIndex;
SSDataBlock* pBufferredRes;
SArray* pUidList;
int32_t indexOfBufferedRes;
SSDataBlock* pRes;
SReadHandle readHandle;
void* pLastrowReader;
SColMatchInfo matchInfo;
int32_t* pSlotIds;
SExprSupp pseudoExprSup;
int32_t retrieveType;
int32_t currentGroupIndex;
SSDataBlock* pBufferredRes;
SArray* pUidList;
int32_t indexOfBufferedRes;
} SLastrowScanInfo;
typedef enum EStreamScanMode {
......@@ -414,13 +401,6 @@ enum {
PROJECT_RETRIEVE_DONE = 0x2,
};
typedef struct SCatchSupporter {
SHashObj* pWindowHashTable; // quick locate the window object for each window
SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file
int32_t keySize;
int64_t* pKeyBuf;
} SCatchSupporter;
typedef struct SStreamAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SSDataBlock* pScanBlock;
......@@ -504,7 +484,6 @@ typedef struct SStreamScanInfo {
STimeWindow updateWin;
STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes;
SHashObj* pGroupIdTbNameMap;
// status for tmq
SNodeList* pGroupTags;
SNode* pTagCond;
......@@ -612,7 +591,6 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pChildren;
SStreamState* pState;
SWinKey delKey;
SHashObj* pGroupIdTbNameMap; // uint64_t -> char[TSDB_TABLE_NAME_LEN]
} SStreamIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
......@@ -745,7 +723,6 @@ typedef struct SStreamSessionAggOperatorInfo {
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
SHashObj* pGroupIdTbNameMap;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
......@@ -761,7 +738,6 @@ typedef struct SStreamStateAggOperatorInfo {
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
SHashObj* pGroupIdTbNameMap;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
......@@ -1046,8 +1022,8 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo);
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
......
......@@ -1026,8 +1026,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SStreamScanInfo* pInfo = pOperator->info;
if (pOffset->type == TMQ_OFFSET__LOG) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->dataReader);
pTSInfo->dataReader = NULL;
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
......@@ -1079,23 +1079,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// TODO after dropping table, table may not found
ASSERT(found);
if (pTableScanInfo->dataReader == NULL) {
if (pTableScanInfo->base.dataReader == NULL) {
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
&pTableScanInfo->dataReader, NULL) < 0 ||
pTableScanInfo->dataReader == NULL) {
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
&pTableScanInfo->base.dataReader, NULL) < 0 ||
pTableScanInfo->base.dataReader == NULL) {
ASSERT(0);
}
}
STableKeyInfo tki = {.uid = uid};
tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1);
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
pTableScanInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
pTableScanInfo->cond.twindows.skey = oldSkey;
tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1);
int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey;
pTableScanInfo->base.cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
pTableScanInfo->base.cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
......
......@@ -1000,12 +1000,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
return TSDB_CODE_SUCCESS;
}
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
}
}
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
if (status == TASK_NOT_COMPLETED) {
pTaskInfo->status = status;
......@@ -1054,7 +1048,7 @@ void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pCol
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
SColumnInfoData* p = NULL;
int32_t status = 0;
......@@ -1064,7 +1058,7 @@ void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pCol
extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
if (pColMatchInfo != NULL) {
size_t size = taosArrayGetSize(pColMatchInfo->pList);
size_t size = taosArrayGetSize(pColMatchInfo->pList);
for (int32_t i = 0; i < size; ++i) {
SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
......@@ -1336,27 +1330,14 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
pBlock->info.groupId = 0;
ASSERT(!pbInfo->mergeResultBlock);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
pBlock->info.parTbName[0] = 0;
}
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
pBlock->info.parTbName[0] = 0;
}
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
tdbFree(tbname);
}
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
......@@ -1665,55 +1646,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey);
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
size_t size = taosArrayGetSize(groupInfo);
if (size == 0) {
return true;
}
for (int32_t i = 0; i < size; ++i) {
int32_t* index = taosArrayGet(groupInfo, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
bool isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
return false;
}
char* pCell = colDataGetData(pColInfo, rowIndex);
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
if (varDataLen(pCell) != varDataLen(buf[i])) {
return false;
} else {
if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
return false;
}
}
} else {
if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
return false;
}
}
}
return 0;
}
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
int32_t size = (int32_t)taosArrayGetSize(pColumnList);
for (int32_t i = 0; i < size; ++i) {
int32_t* index = taosArrayGet(pColumnList, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
char* data = colDataGetData(pColInfo, rowIndex);
memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
}
return true;
}
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
// todo add more information about exchange operation
int32_t type = pOperator->operatorType;
......@@ -1725,13 +1657,13 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = pOperator->info;
*order = pTableScanInfo->cond.order;
*scanFlag = pTableScanInfo->scanFlag;
*order = pTableScanInfo->base.cond.order;
*scanFlag = pTableScanInfo->base.scanFlag;
return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
STableMergeScanInfo* pTableScanInfo = pOperator->info;
*order = pTableScanInfo->cond.order;
*scanFlag = pTableScanInfo->scanFlag;
*order = pTableScanInfo->base.cond.order;
*scanFlag = pTableScanInfo->base.scanFlag;
return TSDB_CODE_SUCCESS;
} else {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
......@@ -2145,7 +2077,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break;
}
doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo );
doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
if (fillResult->info.rows > 0) {
break;
}
......@@ -2372,14 +2304,14 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupId = UINT64_MAX;
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL);
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info;
pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp;
pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup;
pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
}
code = appendDownstream(pOperator, &downstream, 1);
......@@ -2744,7 +2676,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
......@@ -2762,14 +2694,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
if (NULL == pOperator) {
pTaskInfo->code = terrno;
return NULL;
}
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
pTaskInfo);
......@@ -3351,13 +3283,13 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
if (pBlock->info.groupId == 0) {
pBlock->info.groupId = pPos->groupId;
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
tdbFree(tbname);
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.groupId != pPos->groupId) {
......@@ -3443,30 +3375,13 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
if (pBlock->info.groupId == 0) {
pBlock->info.groupId = pKey->groupId;
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
pBlock->info.parTbName[0] = 0;
}
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
if (tbname != NULL) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
pBlock->info.parTbName[0] = 0;
}
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
ASSERT(0);
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
tdbFree(tbname);
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.groupId != pKey->groupId) {
......
......@@ -1450,7 +1450,6 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp
}
}
tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
/*taosHashRemove(pInfo->pGroupIdTbNameMap, &pWinKey->groupId, sizeof(int64_t));*/
}
}
return TSDB_CODE_SUCCESS;
......@@ -1547,7 +1546,8 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
uint64_t uid = 0;
for (int32_t i = *index; i < size; i++) {
SWinKey* pWin = taosArrayGet(pWins, i);
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pWin->groupId, sizeof(int64_t));
void* tbname = NULL;
streamStateGetParName(pInfo->pState, pWin->groupId, &tbname);
if (tbname == NULL) {
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
} else {
......@@ -1555,6 +1555,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
}
tdbFree(tbname);
(*index)++;
}
}
......@@ -1617,7 +1618,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosHashCleanup(pInfo->pGroupIdTbNameMap);
taosMemoryFreeClear(param);
}
......@@ -2545,9 +2545,11 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey;
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
pScanInfo->cond.twindows = pInfo->win;
pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
pScanInfo->base.cond.twindows = pInfo->win;
pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL;
}
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
......@@ -3153,11 +3155,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
}
ASSERT(pBlock->info.type != STREAM_INVERT);
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
......@@ -3372,9 +3369,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0;
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
......@@ -3425,7 +3419,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
blockDataDestroy(pInfo->pWinBlock);
blockDataDestroy(pInfo->pUpdateRes);
tSimpleHashCleanup(pInfo->pStDeleted);
taosHashCleanup(pInfo->pGroupIdTbNameMap);
taosMemoryFreeClear(param);
}
......@@ -3857,30 +3850,18 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
colDataAppendNULL(pCalEdCol, pBlock->info.rows);
SHashObj* pGroupIdTbNameMap = NULL;
if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
} else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
} else {
ASSERT(0);
}
char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t));
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
void* tbname = NULL;
streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname);
if (tbname == NULL) {
/*printf("\n\n no tbname for group id %" PRId64 "%p %p\n\n", res->groupId, pOp->info, pGroupIdTbNameMap);*/
colDataAppendNULL(pTableCol, pBlock->info.rows);
} else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
/*printf("\n\n get tbname %s group id %" PRId64 "\n\n", tbname, res->groupId);*/
}
tdbFree(tbname);
pBlock->info.rows += 1;
}
if ((*Ite) == NULL) {
......@@ -4053,19 +4034,6 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s group id %" PRId64 "\n\n into %p %p", pBlock->info.parTbName, pBlock->info.groupId,*/
/*pInfo, pInfo->pGroupIdTbNameMap);*/
}
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
......@@ -4209,8 +4177,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
OP_NOT_OPENED, pInfo, pTaskInfo);
......@@ -4285,12 +4251,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
printDataBlock(pBlock, "semi session recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
// gap must be 0
......@@ -4418,7 +4378,6 @@ void destroyStreamStateOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
tSimpleHashCleanup(pInfo->pSeDeleted);
taosHashCleanup(pInfo->pGroupIdTbNameMap);
taosMemoryFreeClear(param);
}
......@@ -4612,12 +4571,6 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
}
printDataBlock(pBlock, "single state recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
......@@ -4731,9 +4684,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet =
......@@ -5384,12 +5334,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
printDataBlock(pBlock, "single interval recv");
if (pBlock->info.parTbName[0]) {
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
TSDB_TABLE_NAME_LEN);
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
}
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
......@@ -5547,9 +5491,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->delKey.ts = INT64_MAX;
pInfo->delKey.groupId = 0;
pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet =
......
......@@ -1704,7 +1704,8 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
}
qDebug("GROUP Num:%u", info->groupNum);
for (uint32_t i = 0; i < info->groupNum; ++i) {
uint32_t maxDbgGrpNum = TMIN(info->groupNum, 1000);
for (uint32_t i = 0; i < maxDbgGrpNum; ++i) {
SFilterGroup *group = &info->groups[i];
qDebug("Group%d : unit num[%u]", i, group->unitNum);
......@@ -3920,6 +3921,10 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) {
} else {
SColumnNode *refNode = (SColumnNode *)node->pLeft;
SNodeListNode *listNode = (SNodeListNode *)node->pRight;
if (LIST_LENGTH(listNode->pNodeList) > 10) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
int32_t type = vectorGetConvertType(refNode->node.resType.type, listNode->dataType.type);
if (0 != type && type != refNode->node.resType.type) {
stat->scalarMode = true;
......
......@@ -16,6 +16,7 @@
#include "executor.h"
#include "streamInc.h"
#include "tcommon.h"
#include "tcompare.h"
#include "ttimer.h"
// todo refactor
......@@ -113,6 +114,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
if (pState->pTdbState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
streamStateDestroy(pState);
return NULL;
}
char statePath[1024];
if (!specPath) {
......@@ -121,26 +128,34 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024);
}
if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) {
if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 0) < 0) {
goto _err;
}
// open state storage backend
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb, 0) < 0) {
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
0) < 0) {
goto _err;
}
// todo refactor
if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb, 0) < 0) {
if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
&pState->pTdbState->pFillStateDb, 0) < 0) {
goto _err;
}
if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db,
&pState->pSessionStateDb, 0) < 0) {
if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
&pState->pTdbState->pSessionStateDb, 0) < 0) {
goto _err;
}
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb, 0) < 0) {
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
&pState->pTdbState->pFuncStateDb, 0) < 0) {
goto _err;
}
if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
&pState->pTdbState->pParNameDb, 0) < 0) {
goto _err;
}
......@@ -148,115 +163,117 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
goto _err;
}
pState->pOwner = pTask;
pState->pTdbState->pOwner = pTask;
return pState;
_err:
tdbTbClose(pState->pStateDb);
tdbTbClose(pState->pFuncStateDb);
tdbTbClose(pState->pFillStateDb);
tdbTbClose(pState->pSessionStateDb);
tdbClose(pState->db);
taosMemoryFree(pState);
tdbTbClose(pState->pTdbState->pStateDb);
tdbTbClose(pState->pTdbState->pFuncStateDb);
tdbTbClose(pState->pTdbState->pFillStateDb);
tdbTbClose(pState->pTdbState->pSessionStateDb);
tdbTbClose(pState->pTdbState->pParNameDb);
tdbClose(pState->pTdbState->db);
streamStateDestroy(pState);
return NULL;
}
void streamStateClose(SStreamState* pState) {
tdbCommit(pState->db, &pState->txn);
tdbPostCommit(pState->db, &pState->txn);
tdbTbClose(pState->pStateDb);
tdbTbClose(pState->pFuncStateDb);
tdbTbClose(pState->pFillStateDb);
tdbTbClose(pState->pSessionStateDb);
tdbClose(pState->db);
tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn);
tdbTbClose(pState->pTdbState->pStateDb);
tdbTbClose(pState->pTdbState->pFuncStateDb);
tdbTbClose(pState->pTdbState->pFillStateDb);
tdbTbClose(pState->pTdbState->pSessionStateDb);
tdbTbClose(pState->pTdbState->pParNameDb);
tdbClose(pState->pTdbState->db);
taosMemoryFree(pState);
streamStateDestroy(pState);
}
int32_t streamStateBegin(SStreamState* pState) {
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
tdbTxnClose(&pState->txn);
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
tdbTxnClose(&pState->pTdbState->txn);
return -1;
}
return 0;
}
int32_t streamStateCommit(SStreamState* pState) {
if (tdbCommit(pState->db, &pState->txn) < 0) {
if (tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1;
}
if (tdbPostCommit(pState->db, &pState->txn) < 0) {
if (tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
memset(&pState->pTdbState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStateAbort(SStreamState* pState) {
if (tdbAbort(pState->db, &pState->txn) < 0) {
if (tdbAbort(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
memset(&pState->pTdbState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->txn);
return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->pTdbState->txn);
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
}
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn);
return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), &pState->pTdbState->txn);
}
// todo refactor
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->txn);
return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->pTdbState->txn);
}
// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn);
return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->pTdbState->txn);
}
// todo refactor
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
}
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
}
// todo refactor
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), &pState->txn);
return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), &pState->pTdbState->txn);
}
int32_t streamStateClear(SStreamState* pState) {
......@@ -280,7 +297,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number
// todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), &pState->txn);
return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), &pState->pTdbState->txn);
}
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
......@@ -306,7 +323,7 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
int32_t c = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
......@@ -322,7 +339,7 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL);
tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
int32_t c = 0;
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
......@@ -414,7 +431,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
......@@ -440,7 +457,7 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey*
if (!pCur) {
return NULL;
}
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) {
if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
......@@ -465,7 +482,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
if (pCur == NULL) {
return NULL;
}
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) {
if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
......@@ -512,7 +529,8 @@ void streamFreeVal(void* val) { tdbFree(val); }
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn);
return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
&pState->pTdbState->txn);
}
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
......@@ -535,7 +553,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn);
return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->pTdbState->txn);
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
......@@ -544,7 +562,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
......@@ -571,7 +589,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
......@@ -599,7 +617,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
......@@ -666,7 +684,7 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey*
return -1;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return -1;
}
......@@ -812,6 +830,22 @@ _end:
return res;
}
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
&pState->pTdbState->txn);
return 0;
}
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
int32_t len;
return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
}
void streamStateDestroy(SStreamState* pState) {
taosMemoryFreeClear(pState->pTdbState);
taosMemoryFreeClear(pState);
}
#if 0
char* streamStateSessionDump(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
......@@ -819,7 +853,7 @@ char* streamStateSessionDump(SStreamState* pState) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
......
......@@ -436,8 +436,15 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
}
int32_t ret = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->replicaNum > 1) {
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
if (pSyncNode->peersNum == 2) {
SyncIndex matchIndex0 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
SyncIndex matchIndex1 = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[1]));
if (matchIndex1 > matchIndex0) {
newLeader = (pSyncNode->peersNodeInfo)[1];
}
}
ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
}
......@@ -1396,7 +1403,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// reset sender
bool reset = false;
for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j]) && oldSenders[j] != NULL) {
char host[128];
uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
......@@ -1413,6 +1420,8 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
sNTrace(pSyncNode, "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex,
i, host, port, (pSyncNode->senders)[i], reset);
break;
}
}
}
......
......@@ -44,7 +44,13 @@ int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
return code;
}
void tmsgSendRsp(SRpcMsg* pMsg) { return (*defaultMsgCb.sendRspFp)(pMsg); }
void tmsgSendRsp(SRpcMsg* pMsg) {
#if 1
rpcSendResponse(pMsg);
#else
return (*defaultMsgCb.sendRspFp)(pMsg);
#endif
}
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) { (*defaultMsgCb.sendRedirectRspFp)(pMsg, pNewEpSet); }
......
......@@ -462,8 +462,6 @@ static void uvStartSendResp(SSvrMsg* smsg) {
if (pConn->broken == true) {
// persist by
destroySmsg(smsg);
// transFreeMsg(smsg->msg.pCont);
// taosMemoryFree(smsg);
transUnrefSrvHandle(pConn);
return;
}
......@@ -1234,7 +1232,9 @@ int transReleaseSrvHandle(void* handle) {
m->type = Release;
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
destroySmsg(m);
}
transReleaseExHandle(transGetRefMgt(), refId);
return 0;
......@@ -1269,7 +1269,9 @@ int transSendResponse(const STransMsg* msg) {
STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
destroySmsg(m);
}
transReleaseExHandle(transGetRefMgt(), refId);
return 0;
......@@ -1303,7 +1305,9 @@ int transRegisterMsg(const STransMsg* msg) {
STrans* pTransInst = pThrd->pTransInst;
tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
destroySmsg(m);
}
transReleaseExHandle(transGetRefMgt(), refId);
return 0;
......
......@@ -344,6 +344,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_STB_ALREADY_EXIST, "Stable already exists
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_STB_NOT_EXIST, "Stable not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER, "Table schema is old")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE, "Table already exists in other stables")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
......
......@@ -56,7 +56,7 @@
,,y,script,./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v2.sim
,,y,script,./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v3.sim
,,y,script,./test.sh -f tsim/dnode/vnode_clean.sim
,,,script,./test.sh -f tsim/dnode/use_dropped_dnode.sim
,,y,script,./test.sh -f tsim/dnode/use_dropped_dnode.sim
,,y,script,./test.sh -f tsim/dnode/split_vgroup_replica1.sim
,,y,script,./test.sh -f tsim/dnode/split_vgroup_replica3.sim
,,y,script,./test.sh -f tsim/import/basic.sim
......@@ -113,7 +113,7 @@
,,y,script,./test.sh -f tsim/parser/first_last.sim
,,y,script,./test.sh -f tsim/parser/fill_stb.sim
,,y,script,./test.sh -f tsim/parser/interp.sim
#,,y,script,./test.sh -f tsim/parser/limit2.sim
,,y,script,./test.sh -f tsim/parser/limit2.sim
,,y,script,./test.sh -f tsim/parser/fourArithmetic-basic.sim
,,y,script,./test.sh -f tsim/parser/function.sim
,,y,script,./test.sh -f tsim/parser/groupby-basic.sim
......@@ -136,8 +136,8 @@
,,y,script,./test.sh -f tsim/parser/lastrow.sim
,,y,script,./test.sh -f tsim/parser/lastrow2.sim
,,y,script,./test.sh -f tsim/parser/like.sim
,,,script,./test.sh -f tsim/parser/limit.sim
,,,script,./test.sh -f tsim/parser/limit1.sim
,,y,script,./test.sh -f tsim/parser/limit.sim
,,y,script,./test.sh -f tsim/parser/limit1.sim
,,y,script,./test.sh -f tsim/parser/mixed_blocks.sim
,,y,script,./test.sh -f tsim/parser/nchar.sim
,,y,script,./test.sh -f tsim/parser/nestquery.sim
......@@ -163,7 +163,7 @@
,,y,script,./test.sh -f tsim/parser/timestamp.sim
,,y,script,./test.sh -f tsim/parser/top_groupby.sim
,,y,script,./test.sh -f tsim/parser/topbot.sim
,,,script,./test.sh -f tsim/parser/union.sim
,,y,script,./test.sh -f tsim/parser/union.sim
,,y,script,./test.sh -f tsim/parser/union_sysinfo.sim
,,y,script,./test.sh -f tsim/parser/where.sim
,,y,script,./test.sh -f tsim/query/charScalarFunction.sim
......@@ -176,11 +176,11 @@
,,y,script,./test.sh -f tsim/query/udf.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim
,,,script,./test.sh -f tsim/mnode/basic1.sim
,,,script,./test.sh -f tsim/mnode/basic2.sim
,,,script,./test.sh -f tsim/mnode/basic3.sim
,,,script,./test.sh -f tsim/mnode/basic4.sim
,,,script,./test.sh -f tsim/mnode/basic5.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic2.sim
,,y,script,./test.sh -f tsim/mnode/basic3.sim
,,y,script,./test.sh -f tsim/mnode/basic4.sim
,,y,script,./test.sh -f tsim/mnode/basic5.sim
,,y,script,./test.sh -f tsim/show/basic.sim
,,y,script,./test.sh -f tsim/table/autocreate.sim
,,y,script,./test.sh -f tsim/table/basic1.sim
......@@ -278,7 +278,7 @@
,,y,script,./test.sh -f tsim/stable/values.sim
,,y,script,./test.sh -f tsim/stable/vnode3.sim
,,y,script,./test.sh -f tsim/stable/metrics_idx.sim
,,,script,./test.sh -f tsim/sma/drop_sma.sim
,,n,script,./test.sh -f tsim/sma/drop_sma.sim
,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
......@@ -291,7 +291,7 @@
,,n,script,./test.sh -f tsim/valgrind/checkError7.sim
,,n,script,./test.sh -f tsim/valgrind/checkError8.sim
,,n,script,./test.sh -f tsim/valgrind/checkUdf.sim
,,,script,./test.sh -f tsim/vnode/replica3_basic.sim
,,y,script,./test.sh -f tsim/vnode/replica3_basic.sim
,,y,script,./test.sh -f tsim/vnode/replica3_repeat.sim
,,y,script,./test.sh -f tsim/vnode/replica3_vgroup.sim
,,y,script,./test.sh -f tsim/vnode/replica3_many.sim
......@@ -424,6 +424,7 @@
,,,system-test,python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
,,,system-test,python3 ./test.py -f 1-insert/alter_stable.py
,,,system-test,python3 ./test.py -f 1-insert/alter_table.py
,,,system-test,python3 ./test.py -f 1-insert/boundary.py
,,,system-test,python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
,,,system-test,python3 ./test.py -f 1-insert/table_comment.py
,,,system-test,python3 ./test.py -f 1-insert/time_range_wise.py
......@@ -607,6 +608,8 @@
,,,system-test,python3 ./test.py -f 2-query/upper.py -R
,,,system-test,python3 ./test.py -f 2-query/varchar.py
,,,system-test,python3 ./test.py -f 2-query/varchar.py -R
,,,system-test,python3 ./test.py -f 2-query/case_when.py
,,,system-test,python3 ./test.py -f 2-query/case_when.py -R
,,,system-test,python3 ./test.py -f 1-insert/update_data.py
,,,system-test,python3 ./test.py -f 1-insert/tb_100w_data_order.py
,,,system-test,python3 ./test.py -f 1-insert/delete_stable.py
......@@ -813,6 +816,7 @@
,,,system-test,python3 ./test.py -f 2-query/last_row.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/sml.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/case_when.py -Q 2
,,,system-test,python3 ./test.py -f 2-query/between.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/distinct.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/varchar.py -Q 3
......@@ -906,6 +910,7 @@
,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/sml.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/interp.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/case_when.py -Q 3
,,,system-test,python3 ./test.py -f 2-query/between.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/distinct.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/varchar.py -Q 4
......@@ -998,6 +1003,7 @@
,,,system-test,python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/sml.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/interp.py -Q 4
,,,system-test,python3 ./test.py -f 2-query/case_when.py -Q 4
#develop test
,,,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
......
......@@ -116,6 +116,7 @@ class TDDnode:
self.deployed = 0
self.testCluster = False
self.valgrind = 0
self.asan = False
self.remoteIP = ""
self.cfgDict = {
"monitor": "0",
......@@ -158,6 +159,11 @@ class TDDnode:
def setValgrind(self, value):
self.valgrind = value
def setAsan(self, value):
self.asan = value
if value:
self.execPath = os.path.abspath(self.path + "/tests/script/sh/exec.sh")
def getDataSize(self):
totalSize = 0
......@@ -383,8 +389,14 @@ class TDDnode:
cmd = "mintty -h never %s -c %s" % (
binPath, self.cfgDir)
else:
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
binPath, self.cfgDir)
if self.asan:
asanDir = "%s/sim/asan/dnode%d.asan" % (
self.path, self.index)
cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
binPath, self.cfgDir, asanDir)
else:
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
binPath, self.cfgDir)
else:
valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\" --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
......@@ -444,8 +456,14 @@ class TDDnode:
tdLog.exit("dnode:%d is not deployed" % (self.index))
if self.valgrind == 0:
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
binPath, self.cfgDir)
if self.asan:
asanDir = "%s/sim/asan/dnode%d.asan" % (
self.path, self.index)
cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
binPath, self.cfgDir, asanDir)
else:
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
binPath, self.cfgDir)
else:
valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\" --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
......@@ -464,6 +482,12 @@ class TDDnode:
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
def stop(self):
if self.asan:
stopCmd = "%s -s stop -n dnode%d" % (self.execPath, self.index)
tdLog.info("execute script: " + stopCmd)
os.system(stopCmd)
return
if (not self.remoteIP == ""):
self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1))
tdLog.info("stop dnode%d"%self.index)
......@@ -501,6 +525,12 @@ class TDDnode:
def stoptaosd(self):
if self.asan:
stopCmd = "%s -s stop -n dnode%d" % (self.execPath, self.index)
tdLog.info("execute script: " + stopCmd)
os.system(stopCmd)
return
if (not self.remoteIP == ""):
self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1))
tdLog.info("stop dnode%d"%self.index)
......@@ -534,6 +564,13 @@ class TDDnode:
tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
def forcestop(self):
if self.asan:
stopCmd = "%s -s stop -n dnode%d -x SIGKILL" + \
(self.execPath, self.index)
tdLog.info("execute script: " + stopCmd)
os.system(stopCmd)
return
if (not self.remoteIP == ""):
self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].forcestop()"%(self.index-1,self.index-1))
return
......@@ -606,6 +643,7 @@ class TDDnodes:
self.simDeployed = False
self.testCluster = False
self.valgrind = 0
self.asan = False
self.killValgrind = 1
def init(self, path, remoteIP = ""):
......@@ -629,6 +667,13 @@ class TDDnodes:
def setValgrind(self, value):
self.valgrind = value
def setAsan(self, value):
self.asan = value
if value:
self.stopDnodesPath = os.path.abspath(self.path + "/tests/script/sh/stop_dnodes.sh")
self.stopDnodesSigintPath = os.path.abspath(self.path + "/tests/script/sh/sigint_stop_dnodes.sh")
tdLog.info("run in address sanitizer mode")
def setKillValgrind(self, value):
self.killValgrind = value
......@@ -642,6 +687,7 @@ class TDDnodes:
self.check(index)
self.dnodes[index - 1].setTestCluster(self.testCluster)
self.dnodes[index - 1].setValgrind(self.valgrind)
self.dnodes[index - 1].setAsan(self.asan)
self.dnodes[index - 1].deploy(updatecfgDict)
def cfg(self, index, option, value):
......@@ -692,8 +738,22 @@ class TDDnodes:
if index < 1 or index > 10:
tdLog.exit("index:%d should on a scale of [1, 10]" % (index))
def StopAllSigint(self):
tdLog.info("stop all dnodes sigint")
if self.asan:
tdLog.info("execute script: %s" % self.stopDnodesSigintPath)
os.system(self.stopDnodesSigintPath)
tdLog.info("execute finished")
return
def stopAll(self):
tdLog.info("stop all dnodes")
if self.asan:
tdLog.info("execute script: %s" % self.stopDnodesPath)
os.system(self.stopDnodesPath)
tdLog.info("execute finished")
return
if (not self.dnodes[0].remoteIP == ""):
self.dnodes[0].remoteExec(self.dnodes[0].cfgDict, "for i in range(len(tdDnodes.dnodes)):\n tdDnodes.dnodes[i].running=1\ntdDnodes.stopAll()")
return
......
......@@ -33,13 +33,13 @@ class TDLog:
print("\033[1;36m%s %s\033[0m" % (datetime.datetime.now(), err))
def success(self, info):
printf("\033[1;32m%s %s\033[0m" % (datetime.datetime.now(), info))
print("\033[1;32m%s %s\033[0m" % (datetime.datetime.now(), info))
def notice(self, err):
printf("\033[1;33m%s %s\033[0m" % (datetime.datetime.now(), err))
print("\033[1;33m%s %s\033[0m" % (datetime.datetime.now(), err))
def exit(self, err):
printf("\033[1;31m%s %s\033[0m" % (datetime.datetime.now(), err))
print("\033[1;31m%s %s\033[0m" % (datetime.datetime.now(), err))
sys.exit(1)
def printNoPrefix(self, info):
......
......@@ -73,8 +73,15 @@ class TDSql:
expectErrNotOccured = True
try:
self.cursor.execute(sql)
except BaseException:
except BaseException as e:
expectErrNotOccured = False
caller = inspect.getframeinfo(inspect.stack()[1][0])
self.error_info = repr(e)
# print(error_info)
# self.error_info = error_info[error_info.index('(')+1:-1].split(",")[0].replace("'","")
# self.error_info = (','.join(error_info.split(",")[:-1]).split("(",1)[1:][0]).replace("'","")
# print("!!!!!!!!!!!!!!",self.error_info)
if expectErrNotOccured:
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.exit("%s(%d) failed: sql:%s, expect error not occured" % (caller.filename, caller.lineno, sql))
......@@ -83,6 +90,8 @@ class TDSql:
self.queryCols = 0
self.queryResult = None
tdLog.info("sql:%s, expect error occured" % (sql))
return self.error_info
def query(self, sql, row_tag=None,queryTimes=10):
self.sql = sql
......
......@@ -15,7 +15,7 @@ else
fi
TAOS_DIR=`pwd`
LOG_DIR=$TAOS_DIR/sim/tsim/asan
LOG_DIR=$TAOS_DIR/sim/asan
error_num=`cat ${LOG_DIR}/*.asan | grep "ERROR" | wc -l`
memory_leak=`cat ${LOG_DIR}/*.asan | grep "Direct leak" | wc -l`
......
......@@ -80,7 +80,7 @@ LOG_DIR=$NODE_DIR/log
DATA_DIR=$NODE_DIR/data
MGMT_DIR=$NODE_DIR/data/mgmt
TSDB_DIR=$NODE_DIR/data/tsdb
ASAN_DIR=$SIM_DIR/tsim/asan
ASAN_DIR=$SIM_DIR/asan
TAOS_CFG=$NODE_DIR/cfg/taos.cfg
echo ------------ $EXEC_OPTON $NODE_NAME
......
#!/bin/sh
set +e
#set -x
export LD_PRELOAD=
UNAME_BIN=`which uname`
OS_TYPE=`$UNAME_BIN`
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]; do
#echo "Killing taosd processes " $PID
kill $PID
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
done
#!/bin/sh
set +e
#set -x
UNAME_BIN=`which uname`
OS_TYPE=`$UNAME_BIN`
export LD_PRELOAD=
PID=`ps -ef|grep /usr/bin/taosd | grep -v grep | awk '{print $2}'`
if [ -n "$PID" ]; then
echo systemctl stop taosd
......@@ -22,16 +26,3 @@ while [ -n "$PID" ]; do
fi
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
done
PID=`ps -ef|grep -w tarbitrator | grep -v grep | awk '{print $2}'`
while [ -n "$PID" ]; do
echo kill -9 $PID
pkill -9 tarbitrator
if [ "$OS_TYPE" != "Darwin" ]; then
fuser -k -n tcp 6040
else
lsof -nti:6040 | xargs kill -9
fi
PID=`ps -ef|grep -w tarbitrator | grep -v grep | awk '{print $2}'`
done
......@@ -66,16 +66,13 @@ else
fi
declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR
declare -x SIM_DIR=$TOP_DIR/sim
PROGRAM=$BUILD_DIR/build/bin/tsim
PRG_DIR=$SIM_DIR/tsim
CFG_DIR=$PRG_DIR/cfg
LOG_DIR=$PRG_DIR/log
DATA_DIR=$PRG_DIR/data
ASAN_DIR=$PRG_DIR/asan
ASAN_DIR=$SIM_DIR/asan
chmod -R 777 $PRG_DIR
echo "------------------------------------------------------------------------"
......@@ -141,11 +138,14 @@ if [ -n "$FILE_NAME" ]; then
echo "AsanDir:" $ASAN_DIR/tsim.asan
eval $PROGRAM -c $CFG_DIR -f $FILE_NAME 2> $ASAN_DIR/tsim.asan
result=$?
echo "Execute result: " $result
echo "Execute result:" $result
if [ $result -eq 0 ]; then
$CODE_DIR/sh/checkAsan.sh
else
echo "TSIM has asan errors"
sleep 1
$CODE_DIR/sh/checkAsan.sh
exit 1
fi
fi
......
......@@ -138,10 +138,10 @@ while $i < 10
if $data[0][4] != leader then
return -1
endi
if $data[0][6] != follower then
if $data[0][6] == leader then
return -1
endi
if $data[0][8] != follower then
if $data[0][8] == leader then
return -1
endi
endw
......
......@@ -321,55 +321,41 @@ endi
### [TBASE-350]
## stb + interval + fill + group by + limit offset
sql select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') limit 2 offset 10
sql select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') limit 2 offset 10
if $rows != 2 then
return -1
endi
#add one more test case
sql select max(c1), last(c8) from lm2_db0.lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(linear) limit 10 offset 4089;"
$limit = 5
$offset = $rowNum * 2
$offset = $offset - 2
sql select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1 limit $limit offset $offset
if $rows != $tbNum then
sql select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1 limit $limit offset $offset
if $rows != 1 then
return -1
endi
if $data00 != @18-11-25 19:30:00.000@ then
if $data00 != 9 then
return -1
endi
if $data01 != 9 then
return -1
endi
if $data12 != 9 then
if $data02 != 9.000000000 then
return -1
endi
if $data23 != 9.000000000 then
if $data03 != 9.000000000 then
return -1
endi
if $data34 != 9.000000000 then
if $data04 != 1 then
return -1
endi
if $data45 != 1 then
if $data05 != binary9 then
return -1
endi
if $data56 != binary9 then
return -1
endi
if $data68 != 6 then
return -1
endi
if $data72 != -2 then
return -1
endi
if $data84 != -2.000000000 then
return -1
endi
if $data98 != 9 then
if $data06 != nchar9 then
return -1
endi
#add one more test case
sql select max(c1), last(c8) from lm2_db0.lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(linear) limit 10 offset 4089;"
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import math
from random import randint
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
from util.boundary import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.boundary = DataBoundary()
self.dbname_length_boundary = self.boundary.DBNAME_MAX_LENGTH
self.tbname_length_boundary = self.boundary.TBNAME_MAX_LENGTH
self.stbname_length_boundary = self.boundary.STBNAME_MAX_LENGTH
self.colname_length_boundary = self.boundary.COL_KEY_MAX_LENGTH
self.tagname_length_boundary = self.boundary.TAG_KEY_MAX_LENGTH
self.username_length_boundary = 23
self.password_length_boundary = 128
def dbname_length_check(self):
dbname_length = randint(1,self.dbname_length_boundary-1)
for dbname in [tdCom.get_long_name(self.dbname_length_boundary),tdCom.get_long_name(dbname_length)]:
tdSql.execute(f'create database if not exists {dbname}')
tdSql.query(f'select name from information_schema.ins_databases where name = "{dbname}"')
tdSql.checkEqual(tdSql.queryResult[0][0],dbname)
tdSql.execute(f'drop database if exists {dbname}')
dbname = tdCom.get_long_name(self.dbname_length_boundary+1)
tdSql.error(f'create database if not exists {dbname}')
if "Invalid identifier name" in tdSql.error_info:
tdLog.info("error info is true!")
else:
tdLog.exit("error info is not true")
def tbname_length_check(self):
tdSql.prepare()
tdSql.execute('use db')
tbname_length = randint(1,self.tbname_length_boundary-1)
tdSql.execute(f'create table stb (ts timestamp,c0 int) tags(t0 int)')
for tbname in [tdCom.get_long_name(self.tbname_length_boundary),tdCom.get_long_name(tbname_length)]:
tdSql.execute(f'create table {tbname} using stb tags(1)')
tdSql.query(f'select table_name from information_schema.ins_tables where table_name = "{tbname}"')
tdSql.checkEqual(tdSql.queryResult[0][0],tbname)
tdSql.execute(f'drop table {tbname}')
tbname = tdCom.get_long_name(self.tbname_length_boundary+1)
tdSql.error(f'create table {tbname} using stb tags(1)')
if "Invalid identifier name" in tdSql.error_info:
tdLog.info("error info is true!")
else:
tdLog.exit("error info is not true")
stbname_length = randint(1,self.stbname_length_boundary-1)
for stbname in [tdCom.get_long_name(self.stbname_length_boundary),tdCom.get_long_name(stbname_length)]:
tdSql.execute(f'create table {stbname} (ts timestamp,c0 int) tags(t0 int)')
tdSql.query(f'select stable_name from information_schema.ins_stables where stable_name = "{stbname}"')
tdSql.checkEqual(tdSql.queryResult[0][0],stbname)
tdSql.execute(f'drop table {stbname}')
stbname = tdCom.get_long_name(self.stbname_length_boundary+1)
tdSql.error(f'create table {stbname} (ts timestamp,c0 int) tags(t0 int)')
print(tdSql.error_info)
if "Invalid identifier name" in tdSql.error_info:
tdLog.info("error info is true!")
else:
tdLog.exit("error info is not true")
tdSql.execute('drop database db')
def colname_length_check(self):
tdSql.prepare()
tdSql.execute('use db')
column_name_length = randint(1,self.colname_length_boundary-1)
for colname in [tdCom.get_long_name(column_name_length),tdCom.get_long_name(self.colname_length_boundary)]:
stbname = tdCom.get_long_name(3)
ntbname = tdCom.get_long_name(4)
tdSql.execute(f'create table {stbname} (ts timestamp,{colname} int) tags(t0 int)')
tdSql.query(f'describe {stbname}')
tdSql.checkEqual(tdSql.queryResult[1][0],colname)
tdSql.execute(f'create table {ntbname} (ts timestamp,{colname} int)')
tdSql.query(f'describe {ntbname}')
tdSql.checkEqual(tdSql.queryResult[1][0],colname)
colname = tdCom.get_long_name(self.colname_length_boundary+1)
tdSql.error(f'create table stb (ts timestamp,{colname} int) tags(t0 int)')
if "Invalid identifier name" in tdSql.error_info:
tdLog.info("error info is true!")
else:
tdLog.exit("error info is not true")
tdSql.execute('drop database db')
def tagname_length_check(self):
tdSql.prepare()
tdSql.execute('use db')
tag_name_length = randint(1,self.tagname_length_boundary-1)
for tagname in (tdCom.get_long_name(tag_name_length),tdCom.get_long_name(self.tagname_length_boundary)):
stbname = tdCom.get_long_name(3)
tdSql.execute(f'create table {stbname} (ts timestamp,c0 int) tags({tagname} int)')
tdSql.query(f'describe {stbname}')
tdSql.checkEqual(tdSql.queryResult[-1][0],tagname)
tagname = tdCom.get_long_name(self.tagname_length_boundary+1)
tdSql.error(f'create table {stbname} (ts timestamp,c0 int) tags({tagname} int)')
if "Invalid identifier name" in tdSql.error_info:
tdLog.info("error info is true!")
else:
tdLog.exit("error info is not true")
tdSql.execute('drop database db')
def username_length_check(self):
username_length = randint(1,self.username_length_boundary-1)
for username in [tdCom.get_long_name(username_length),tdCom.get_long_name(self.username_length_boundary)]:
tdSql.execute(f'create user {username} pass "123"')
tdSql.query('show users')
for user in tdSql.queryResult:
if user[0].lower() != 'root':
tdSql.checkEqual(user[0],username)
tdSql.execute(f'drop user {username}')
username = tdCom.get_long_name(self.username_length_boundary+1)
tdSql.error(f'create user {username} pass "123"')
if "Name or password too long" in tdSql.error_info:
tdLog.info("error info is true!")
else:
tdLog.exit("error info is not true")
def password_length_check(self):
password_length = randint(1,self.password_length_boundary-1)
for password in [tdCom.get_long_name(password_length),tdCom.get_long_name(self.password_length_boundary)]:
username = tdCom.get_long_name(3)
tdSql.execute(f'create user {username} pass "{password}"')
password = tdCom.get_long_name(self.password_length_boundary+1)
tdSql.error(f'create user {username} pass "{password}"')
if "Name or password too long" in tdSql.error_info:
tdLog.info("error info is true!")
else:
tdLog.exit("error info is not true")
def sql_length_check(self):
insert_rows = 1021
tdSql.prepare()
tdSql.execute('use db')
tdSql.execute('create table ntb (ts timestamp,c0 binary(1013))')
values_sql = ''
value = tdCom.get_long_name(1013)
for num in range(insert_rows):
values_sql += f' (now+{num}s,"{value}")'
value = tdCom.get_long_name(65)
values_sql += f"(now-1s,'{value}')"
tdSql.execute(f'insert into ntb values{values_sql}')
tdSql.query('select * from ntb')
tdSql.checkRows(insert_rows+1)
tdSql.execute('create table ntb1 (ts timestamp,c0 binary(1013))')
tdSql.error(f'insert into ntb1 values{values_sql};')
print(tdSql.error_info)
if "SQL statement too long" in tdSql.error_info:
tdLog.info("error info is true!")
else:
tdLog.exit("error info is not true")
tdSql.execute('drop database db')
def run(self):
self.dbname_length_check()
self.tbname_length_check()
self.colname_length_check()
self.tagname_length_check()
self.username_length_check()
self.password_length_check()
self.sql_length_check()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
......@@ -23,7 +23,7 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(),logSql)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
self.dbname = 'db_test'
self.ntbname = 'ntb'
......
此差异已折叠。
......@@ -73,8 +73,9 @@ if __name__ == "__main__":
createDnodeNums = 1
restful = False
replicaVar = 1
asan = False
independentMnode = True
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RD:n:i:', [
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RD:n:i:a', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','adaptercfgupdate','replicaVar','independentMnode'])
for key, value in opts:
if key in ['-h', '--help']:
......@@ -99,6 +100,7 @@ if __name__ == "__main__":
tdLog.printNoPrefix('-D taosadapter update cfg dict ')
tdLog.printNoPrefix('-n the number of replicas')
tdLog.printNoPrefix('-i independentMnode Mnode')
tdLog.printNoPrefix('-a address sanitizer mode')
sys.exit(0)
......@@ -167,6 +169,9 @@ if __name__ == "__main__":
if key in ['-R', '--restful']:
restful = True
if key in ['-a', '--asan']:
asan = True
if key in ['-D', '--adaptercfgupdate']:
try:
adaptercfgupdate = eval(base64.b64decode(value.encode()).decode())
......@@ -387,6 +392,7 @@ if __name__ == "__main__":
tdDnodes.init(deployPath, masterIp)
tdDnodes.setTestCluster(testCluster)
tdDnodes.setValgrind(valgrind)
tdDnodes.setAsan(asan)
tdDnodes.stopAll()
is_test_framework = 0
key_word = 'tdCases.addLinux'
......@@ -547,4 +553,7 @@ if __name__ == "__main__":
if conn is not None:
conn.close()
if asan:
tdDnodes.StopAllSigint()
tdLog.info("address sanitizer mode finished")
sys.exit(0)
#!/bin/bash
##################################################
#
# Do simulation test
#
##################################################
set +e
#set -x
UNAME_BIN=`which uname`
OS_TYPE=`$UNAME_BIN`
cd .
# Get responsible directories
CODE_DIR=`dirname $0`
CODE_DIR=`pwd`
IN_TDINTERNAL="community"
if [[ "$CODE_DIR" == *"$IN_TDINTERNAL"* ]]; then
cd ../../..
else
cd ../../
fi
TOP_DIR=`pwd`
TAOSD_DIR=`find . -name "taosd"|grep bin|head -n1`
if [[ "$OS_TYPE" != "Darwin" ]]; then
cut_opt="--field="
else
cut_opt="-f "
fi
if [[ "$TAOSD_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' ${cut_opt}2,3`
else
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' ${cut_opt}2`
fi
declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR
declare -x SIM_DIR=$TOP_DIR/sim
PROGRAM=$BUILD_DIR/build/bin/tsim
PRG_DIR=$SIM_DIR/tsim
ASAN_DIR=$SIM_DIR/asan
SYSTEM_TEST_DIR=$TOP_DIR/tests/system-test
chmod -R 777 $PRG_DIR
echo "------------------------------------------------------------------------"
echo "Start TDengine Testing Case ..."
echo "BUILD_DIR: $BUILD_DIR"
echo "SYSTEM_TEST_DIR : $SYSTEM_TEST_DIR"
echo "SIM_DIR : $SIM_DIR"
echo "CODE_DIR : $CODE_DIR"
echo "ASAN_DIR : $ASAN_DIR"
rm -rf $SIM_DIR/*
mkdir -p $PRG_DIR
mkdir -p $ASAN_DIR
cd $SYSTEM_TEST_DIR
ulimit -n 600000
ulimit -c unlimited
#sudo sysctl -w kernel.core_pattern=$TOP_DIR/core.%p.%e
echo "ExcuteCmd:" $*
echo "AsanDir:" $ASAN_DIR/psim.asan
export LD_PRELOAD=libasan.so.5
$* -a 2> $ASAN_DIR/psim.asan
result=$?
echo "Execute result:" $result
if [ $result -eq 0 ]; then
$CODE_DIR/sh/checkAsan.sh
else
exit 1
fi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册