提交 a7cfebfe 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch '3.0' into feat/sangshuduo/TD-14141-update-taostools-for3.0

...@@ -22,6 +22,7 @@ mac/ ...@@ -22,6 +22,7 @@ mac/
.mypy_cache .mypy_cache
*.tmp *.tmp
*.swp *.swp
*.swo
*.orig *.orig
src/connector/nodejs/node_modules/ src/connector/nodejs/node_modules/
src/connector/nodejs/out/ src/connector/nodejs/out/
......
...@@ -153,11 +153,10 @@ typedef struct SQueryTableDataCond { ...@@ -153,11 +153,10 @@ typedef struct SQueryTableDataCond {
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols; int32_t numOfCols;
SColumnInfo* colList; SColumnInfo* colList;
int32_t type; // data block load type: int32_t type; // data block load type:
// int32_t numOfTWindows; STimeWindow twindows;
STimeWindow twindows; int64_t startVersion;
int64_t startVersion; int64_t endVersion;
int64_t endVersion;
} SQueryTableDataCond; } SQueryTableDataCond;
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
......
...@@ -158,7 +158,7 @@ static const SSysDbTableSchema userTagsSchema[] = { ...@@ -158,7 +158,7 @@ static const SSysDbTableSchema userTagsSchema[] = {
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "tag_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "tag_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "tag_type", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "tag_type", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
}; };
......
...@@ -213,10 +213,6 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) { ...@@ -213,10 +213,6 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg)); memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg));
if (pCfg->level == 0 && pCfg->primary == 1) { if (pCfg->level == 0 && pCfg->primary == 1) {
tstrncpy(tsDataDir, pCfg->dir, PATH_MAX); tstrncpy(tsDataDir, pCfg->dir, PATH_MAX);
if (taosMulMkDir(tsDataDir) != 0) {
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
return -1;
}
} }
if (taosMulMkDir(pCfg->dir) != 0) { if (taosMulMkDir(pCfg->dir) != 0) {
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr()); uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
...@@ -227,12 +223,13 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) { ...@@ -227,12 +223,13 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
if (tsDataDir[0] == 0) { if (tsDataDir[0] == 0) {
if (pItem->str != NULL) { if (pItem->str != NULL) {
taosAddDataDir(0, pItem->str, 0, 1); taosAddDataDir(tsDiskCfgNum, pItem->str, 0, 1);
tstrncpy(tsDataDir, pItem->str, PATH_MAX); tstrncpy(tsDataDir, pItem->str, PATH_MAX);
if (taosMulMkDir(tsDataDir) != 0) { if (taosMulMkDir(tsDataDir) != 0) {
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr()); uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
return -1; return -1;
} }
tsDiskCfgNum++;
} else { } else {
uError("datadir not set"); uError("datadir not set");
return -1; return -1;
......
...@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { ...@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
pBasic->queryDesc = NULL; pBasic->queryDesc = NULL;
mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries); mDebug("queries updated in conn %u, num:%d", pConn->id, pConn->numOfQueries);
taosWUnLockLatch(&pConn->queryLock); taosWUnLockLatch(&pConn->queryLock);
......
...@@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur); ...@@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// typedef struct STsdb STsdb; // typedef struct STsdb STsdb;
typedef struct STsdbReader STsdbReader; typedef struct STsdbReader STsdbReader;
#define BLOCK_LOAD_OFFSET_ORDER 1 #define TIMEWINDOW_RANGE_CONTAINED 1
#define BLOCK_LOAD_TABLESEQ_ORDER 2 #define TIMEWINDOW_RANGE_EXTERNAL 2
#define BLOCK_LOAD_EXTERN_ORDER 3
#define LASTROW_RETRIEVE_TYPE_ALL 0x1 #define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 #define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
......
...@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); ...@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRowInfo(SResultRowInfo* pResultRowInfo);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo);
void initResultRow(SResultRow* pResultRow); void initResultRow(SResultRow* pResultRow);
void closeResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow);
bool isResultRowClosed(SResultRow* pResultRow); bool isResultRowClosed(SResultRow* pResultRow);
...@@ -96,6 +94,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo ...@@ -96,6 +94,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
return pRow; return pRow;
} }
static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
void* pPage = getBufPage(pBuf, pos->pageId);
setBufPageDirty(pPage, true);
}
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
......
...@@ -108,7 +108,6 @@ typedef struct STaskCostInfo { ...@@ -108,7 +108,6 @@ typedef struct STaskCostInfo {
SFileBlockLoadRecorder* pRecoder; SFileBlockLoadRecorder* pRecoder;
uint64_t elapsedTime; uint64_t elapsedTime;
uint64_t firstStageMergeTime;
uint64_t winInfoSize; uint64_t winInfoSize;
uint64_t tableInfoSize; uint64_t tableInfoSize;
uint64_t hashSize; uint64_t hashSize;
...@@ -352,6 +351,11 @@ typedef enum EStreamScanMode { ...@@ -352,6 +351,11 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_DATAREADER_RANGE, STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode; } EStreamScanMode;
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
typedef struct SCatchSupporter { typedef struct SCatchSupporter {
SHashObj* pWindowHashTable; // quick locate the window object for each window SHashObj* pWindowHashTable; // quick locate the window object for each window
SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file
...@@ -549,6 +553,7 @@ typedef struct SProjectOperatorInfo { ...@@ -549,6 +553,7 @@ typedef struct SProjectOperatorInfo {
SLimitInfo limitInfo; SLimitInfo limitInfo;
bool mergeDataBlocks; bool mergeDataBlocks;
SSDataBlock* pFinalRes; SSDataBlock* pFinalRes;
SNode* pCondition;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SIndefOperatorInfo { typedef struct SIndefOperatorInfo {
......
...@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) { ...@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) {
} }
} }
void closeAllResultRows(SResultRowInfo* pResultRowInfo) {
// do nothing
}
bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); } bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); }
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
...@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { ...@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
SArray* createSortInfo(SNodeList* pNodeList) { SArray* createSortInfo(SNodeList* pNodeList) {
size_t numOfCols = 0; size_t numOfCols = 0;
if (pNodeList != NULL) { if (pNodeList != NULL) {
numOfCols = LIST_LENGTH(pNodeList); numOfCols = LIST_LENGTH(pNodeList);
} else { } else {
numOfCols = 0; numOfCols = 0;
} }
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
if (pList == NULL) { if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { ...@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
/*if (!pDescNode->output) { // todo disable it temporarily*/
/*continue;*/
/*}*/
SColumnInfoData idata = SColumnInfoData idata =
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId); createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
idata.info.scale = pDescNode->dataType.scale; idata.info.scale = pDescNode->dataType.scale;
...@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu ...@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
} }
} }
#ifdef BUF_PAGE_DEBUG
qDebug("page_setSelect num:%d", num);
#endif
if (p != NULL) { if (p != NULL) {
p->subsidiaries.pCtx = pValCtx; p->subsidiaries.pCtx = pValCtx;
p->subsidiaries.num = num; p->subsidiaries.num = num;
...@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi ...@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
// TODO: get it from stable scan node // TODO: get it from stable scan node
pCond->twindows = pTableScanNode->scanRange; pCond->twindows = pTableScanNode->scanRange;
pCond->suid = pTableScanNode->scan.suid; pCond->suid = pTableScanNode->scan.suid;
pCond->type = BLOCK_LOAD_OFFSET_ORDER; pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1; pCond->startVersion = -1;
pCond->endVersion = -1; pCond->endVersion = -1;
// pCond->type = pTableScanNode->scanFlag; // pCond->type = pTableScanNode->scanFlag;
...@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter ...@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
} }
// get the correct time window according to the handled timestamp // get the correct time window according to the handled timestamp
// todo refactor
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t order) { int32_t order) {
STimeWindow w = {0}; STimeWindow w = {0};
......
此差异已折叠。
...@@ -1677,6 +1677,87 @@ static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) { ...@@ -1677,6 +1677,87 @@ static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
return pBlock; return pBlock;
} }
int32_t convertTagDataToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len) {
int32_t n = 0;
switch (type) {
case TSDB_DATA_TYPE_NULL:
n = sprintf(str, "null");
break;
case TSDB_DATA_TYPE_BOOL:
n = sprintf(str, (*(int8_t*)buf) ? "true" : "false");
break;
case TSDB_DATA_TYPE_TINYINT:
n = sprintf(str, "%d", *(int8_t*)buf);
break;
case TSDB_DATA_TYPE_SMALLINT:
n = sprintf(str, "%d", *(int16_t*)buf);
break;
case TSDB_DATA_TYPE_INT:
n = sprintf(str, "%d", *(int32_t*)buf);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
n = sprintf(str, "%" PRId64, *(int64_t*)buf);
break;
case TSDB_DATA_TYPE_FLOAT:
n = sprintf(str, "%.5f", GET_FLOAT_VAL(buf));
break;
case TSDB_DATA_TYPE_DOUBLE:
n = sprintf(str, "%.9f", GET_DOUBLE_VAL(buf));
break;
case TSDB_DATA_TYPE_BINARY:
if (bufSize < 0) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
memcpy(str, buf, bufSize);
n = bufSize;
break;
case TSDB_DATA_TYPE_NCHAR:
if (bufSize < 0) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str);
if (length <= 0) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
n = length;
break;
case TSDB_DATA_TYPE_UTINYINT:
n = sprintf(str, "%u", *(uint8_t*)buf);
break;
case TSDB_DATA_TYPE_USMALLINT:
n = sprintf(str, "%u", *(uint16_t*)buf);
break;
case TSDB_DATA_TYPE_UINT:
n = sprintf(str, "%u", *(uint32_t*)buf);
break;
case TSDB_DATA_TYPE_UBIGINT:
n = sprintf(str, "%" PRIu64, *(uint64_t*)buf);
break;
default:
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (len) *len = n;
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
...@@ -1747,14 +1828,24 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -1747,14 +1828,24 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
pColInfoData = taosArrayGet(p->pDataBlock, 2); pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataAppend(pColInfoData, numOfRows, stableName, false); colDataAppend(pColInfoData, numOfRows, stableName, false);
// tag name
char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tagName, smr.me.stbEntry.schemaTag.pSchema[i].name); STR_TO_VARSTR(tagName, smr.me.stbEntry.schemaTag.pSchema[i].name);
pColInfoData = taosArrayGet(p->pDataBlock, 3); pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, tagName, false); colDataAppend(pColInfoData, numOfRows, tagName, false);
// tag type
int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type; int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type;
pColInfoData = taosArrayGet(p->pDataBlock, 4); pColInfoData = taosArrayGet(p->pDataBlock, 4);
colDataAppend(pColInfoData, numOfRows, (char*)&tagType, false); char tagTypeStr[VARSTR_HEADER_SIZE + 32];
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
if (tagType == TSDB_DATA_TYPE_VARCHAR) {
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE));
} else if (tagType == TSDB_DATA_TYPE_NCHAR) {
tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(tagTypeStr, tagTypeLen);
colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false);
STagVal tagVal = {0}; STagVal tagVal = {0};
tagVal.cid = smr.me.stbEntry.schemaTag.pSchema[i].colId; tagVal.cid = smr.me.stbEntry.schemaTag.pSchema[i].colId;
...@@ -1789,7 +1880,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -1789,7 +1880,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE); : (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
tagVarChar = taosMemoryMalloc(bufSize); tagVarChar = taosMemoryMalloc(bufSize);
int32_t len = -1; int32_t len = -1;
dataConverToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len); convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
varDataSetLen(tagVarChar, len); varDataSetLen(tagVarChar, len);
} }
} }
......
...@@ -940,6 +940,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -940,6 +940,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
} }
...@@ -996,6 +997,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -996,6 +997,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
...@@ -1092,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1092,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
} }
closeAllResultRows(&pInfo->binfo.resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
...@@ -1248,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1248,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
...@@ -2043,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -2043,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
...@@ -2207,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2207,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SSDataBlock* pResBlock = pSliceInfo->pRes; SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
// if (pOperator->status == OP_RES_TO_RETURN) { // if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
...@@ -2348,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -2348,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->win = pInterpPhyNode->timeRange; pInfo->win = pInterpPhyNode->timeRange;
pInfo->interval.interval = pInterpPhyNode->interval; pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey; pInfo->current = pInfo->win.skey;
pOperator->name = "TimeSliceOperator"; pOperator->name = "TimeSliceOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
...@@ -2542,6 +2539,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr ...@@ -2542,6 +2539,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
} }
if (find && pUpdated) { if (find && pUpdated) {
saveResultRow(pCurResult, pWinRes->groupId, pUpdated); saveResultRow(pCurResult, pWinRes->groupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur);
} }
} }
} }
...@@ -2662,6 +2660,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2662,6 +2660,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
} }
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
......
...@@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t code = 0; int32_t code = 0;
bool qcontinue = true; bool qcontinue = true;
SSDataBlock *pRes = NULL; SSDataBlock *pRes = NULL;
...@@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryEnd) { if (queryStop) {
*queryEnd = true; *queryStop = true;
} }
break; break;
...@@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue); QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
if (!qcontinue) { if (!qcontinue) {
if (queryStop) {
*queryStop = true;
}
break; break;
} }
...@@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
void *rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
bool queryEnd = false; bool queryStop = false;
do { do {
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
...@@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8((int8_t *)&ctx->queryInQueue, 0); atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
atomic_store_8((int8_t *)&ctx->queryContinue, 0); atomic_store_8((int8_t *)&ctx->queryContinue, 0);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0}; SOutputData sOutput = {0};
...@@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
// Note: query is not running anymore // Note: query is not running anymore
QW_SET_PHASE(ctx, 0); QW_SET_PHASE(ctx, 0);
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
......
...@@ -300,6 +300,8 @@ int transSendResponse(const STransMsg* msg); ...@@ -300,6 +300,8 @@ int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg); int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst);
int64_t transAllocHandle(); int64_t transAllocHandle();
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
......
...@@ -37,9 +37,11 @@ typedef struct SCliConn { ...@@ -37,9 +37,11 @@ typedef struct SCliConn {
uint32_t port; uint32_t port;
SDelayTask* task; SDelayTask* task;
// debug and log info // debug and log info
struct sockaddr_in addr; char src[32];
struct sockaddr_in localAddr; char dst[32];
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
...@@ -95,6 +97,14 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); ...@@ -95,6 +97,14 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, SCliConn* conn); static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[20] = {0};
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r;
}
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
// static void cliTimeoutCb(uv_timer_t* handle); // static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv // alloc buf for recv
...@@ -363,9 +373,9 @@ void cliHandleResp(SCliConn* conn) { ...@@ -363,9 +373,9 @@ void cliHandleResp(SCliConn* conn) {
} }
STraceId* trace = &transMsg.info.traceId; STraceId* trace = &transMsg.info.traceId;
tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, code:0x%x", CONN_GET_INST_LABEL(conn),
conn, TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d, code:0x%x", CONN_GET_INST_LABEL(conn), conn,
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code); TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, transMsg.code);
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
...@@ -741,9 +751,8 @@ void cliSend(SCliConn* pConn) { ...@@ -741,9 +751,8 @@ void cliSend(SCliConn* pConn) {
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
STraceId* trace = &pMsg->info.traceId; STraceId* trace = &pMsg->info.traceId;
tGTrace("%s conn %p %s is sent to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, tGTrace("%s conn %p %s is sent to %s, local info %s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType),
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), pConn->dst, pConn->src);
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port));
if (pHead->persist == 1) { if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
...@@ -764,11 +773,16 @@ void cliConnCb(uv_connect_t* req, int status) { ...@@ -764,11 +773,16 @@ void cliConnCb(uv_connect_t* req, int status) {
cliHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
int addrlen = sizeof(pConn->addr); // int addrlen = sizeof(pConn->addr);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen); struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
transGetSockDebugInfo(&peername, pConn->dst);
addrlen = sizeof(pConn->localAddr); addrlen = sizeof(sockname);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->localAddr, &addrlen); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
transGetSockDebugInfo(&sockname, pConn->src);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
......
...@@ -102,7 +102,14 @@ void transFreeMsg(void* msg) { ...@@ -102,7 +102,14 @@ void transFreeMsg(void* msg) {
} }
taosMemoryFree((char*)msg - sizeof(STransMsgHead)); taosMemoryFree((char*)msg - sizeof(STransMsgHead));
} }
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[20] = {0};
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r;
}
int transInitBuffer(SConnBuffer* buf) { int transInitBuffer(SConnBuffer* buf) {
transClearBuffer(buf); transClearBuffer(buf);
return 0; return 0;
......
...@@ -43,9 +43,13 @@ typedef struct SSvrConn { ...@@ -43,9 +43,13 @@ typedef struct SSvrConn {
SSvrRegArg regArg; SSvrRegArg regArg;
bool broken; // conn broken; bool broken; // conn broken;
ConnStatus status; ConnStatus status;
struct sockaddr_in addr;
struct sockaddr_in localAddr; uint32_t clientIp;
uint16_t port;
char src[32];
char dst[32];
int64_t refId; int64_t refId;
int spi; int spi;
...@@ -248,15 +252,11 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -248,15 +252,11 @@ static void uvHandleReq(SSvrConn* pConn) {
if (pConn->status == ConnNormal && pHead->noResp == 0) { if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d", transLabel(pTransInst), pConn, tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen);
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen);
} else { } else {
tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, resp:%d, code:%d", tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d, resp:%d, code:%d", transLabel(pTransInst),
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen, pHead->noResp, transMsg.code);
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port),
transMsg.contLen, pHead->noResp, transMsg.code);
// no ref here
} }
// pHead->noResp = 1, // pHead->noResp = 1,
...@@ -278,14 +278,13 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -278,14 +278,13 @@ static void uvHandleReq(SSvrConn* pConn) {
// set up conn info // set up conn info
SRpcConnInfo* pConnInfo = &(transMsg.info.conn); SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr); pConnInfo->clientIp = pConn->clientIp;
pConnInfo->clientPort = ntohs(pConn->addr.sin_port); pConnInfo->clientPort = pConn->port;
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
transReleaseExHandle(transGetRefMgt(), pConn->refId); transReleaseExHandle(transGetRefMgt(), pConn->refId);
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
} }
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
...@@ -418,9 +417,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -418,9 +417,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
STraceId* trace = &pMsg->info.traceId; STraceId* trace = &pMsg->info.traceId;
tGTrace("%s conn %p %s is sent to %s:%d, local info:%s:%d, msglen:%d", transLabel(pTransInst), pConn, tGTrace("%s conn %p %s is sent to %s, local info:%s, msglen:%d", transLabel(pTransInst), pConn,
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len);
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len);
pHead->msgLen = htonl(len); pHead->msgLen = htonl(len);
wb->base = msg; wb->base = msg;
...@@ -646,20 +644,26 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -646,20 +644,26 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tTrace("conn %p created, fd:%d", pConn, fd); tTrace("conn %p created, fd:%d", pConn, fd);
int addrlen = sizeof(pConn->addr); struct sockaddr peername, sockname;
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) { int addrlen = sizeof(peername);
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&peername, &addrlen)) {
tError("conn %p failed to get peer info", pConn); tError("conn %p failed to get peer info", pConn);
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
return; return;
} }
transGetSockDebugInfo(&peername, pConn->dst);
addrlen = sizeof(pConn->localAddr); addrlen = sizeof(sockname);
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->localAddr, &addrlen)) { if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&sockname, &addrlen)) {
tError("conn %p failed to get local info", pConn); tError("conn %p failed to get local info", pConn);
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
return; return;
} }
transGetSockDebugInfo(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
pConn->clientIp = addr.sin_addr.s_addr;
pConn->port = ntohs(addr.sin_port);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
} else { } else {
......
...@@ -1327,6 +1327,8 @@ class Task(): ...@@ -1327,6 +1327,8 @@ class Task():
# TDengine 3.0 Error Codes: # TDengine 3.0 Error Codes:
0x0333, # Object is creating # TODO: this really is NOT an acceptable error 0x0333, # Object is creating # TODO: this really is NOT an acceptable error
0x0369, # Tag already exists
0x0388, # Database not exist
0x03A0, # STable already exists 0x03A0, # STable already exists
0x03A1, # STable [does] not exist 0x03A1, # STable [does] not exist
0x03AA, # Tag already exists 0x03AA, # Tag already exists
......
...@@ -101,7 +101,7 @@ ...@@ -101,7 +101,7 @@
./test.sh -f tsim/parser/constCol.sim ./test.sh -f tsim/parser/constCol.sim
#./test.sh -f tsim/parser/create_db.sim #./test.sh -f tsim/parser/create_db.sim
./test.sh -f tsim/parser/create_mt.sim ./test.sh -f tsim/parser/create_mt.sim
# TD-17653 ./test.sh -f tsim/parser/create_tb_with_tag_name.sim ./test.sh -f tsim/parser/create_tb_with_tag_name.sim
./test.sh -f tsim/parser/create_tb.sim ./test.sh -f tsim/parser/create_tb.sim
./test.sh -f tsim/parser/dbtbnameValidate.sim ./test.sh -f tsim/parser/dbtbnameValidate.sim
./test.sh -f tsim/parser/distinct.sim ./test.sh -f tsim/parser/distinct.sim
......
...@@ -93,7 +93,7 @@ sql_error create table tb11 using st2 (id,t1,) tags (1,1,1); ...@@ -93,7 +93,7 @@ sql_error create table tb11 using st2 (id,t1,) tags (1,1,1);
sql create table tb12 using st2 (t1,id) tags (2,1); sql create table tb12 using st2 (t1,id) tags (2,1);
sql show tags from tb12; sql show tags from tb12;
if $rows != 5 then if $rows != 4 then
return -1 return -1
endi endi
if $data05 != 1 then if $data05 != 1 then
...@@ -109,9 +109,9 @@ if $data35 != NULL then ...@@ -109,9 +109,9 @@ if $data35 != NULL then
return -1 return -1
endi endi
sql create table tb13 using st2 ("t1",'id') tags (2,1); sql create table tb13 using st2 (t1,id) tags (2,1);
sql show tags from tb13; sql show tags from tb13;
if $rows != 2 then if $rows != 4 then
return -1 return -1
endi endi
if $data05 != 1 then if $data05 != 1 then
......
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 100
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("flush db to let data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 500,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
# after start consume, continue insert some data
paraDict['batchNum'] = 100
paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
pInsertThread.join()
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
if expectRowsList[0] != resultList[0]:
tdLog.exit("%d tmq consume rows error!"%consumerId)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("flush database %s"%(paraDict['dbName']))
for i in range(len(topicNameList)):
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 500,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 1
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeRows = resultList[0]
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows):
tdLog.exit("%d tmq consume rows error!"%consumerId)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 2
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeRows = resultList[0]
tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)):
tdLog.exit("%d tmq consume rows error!"%consumerId)
for i in range(len(topicNameList)):
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -17,8 +17,8 @@ from tmqCommon import * ...@@ -17,8 +17,8 @@ from tmqCommon import *
class TDTestCase: class TDTestCase:
def __init__(self): def __init__(self):
self.vgroups = 1 self.vgroups = 4
self.ctbNum = 100 self.ctbNum = 1
self.rowsPerTbl = 10000 self.rowsPerTbl = 10000
def init(self, conn, logSql): def init(self, conn, logSql):
...@@ -38,9 +38,9 @@ class TDTestCase: ...@@ -38,9 +38,9 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb', 'ctbPrefix': 'ctb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 100, 'ctbNum': 1,
'rowsPerTbl': 10000, 'rowsPerTbl': 10000,
'batchNum': 3000, 'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10, 'pollDelay': 10,
'showMsg': 1, 'showMsg': 1,
...@@ -85,7 +85,7 @@ class TDTestCase: ...@@ -85,7 +85,7 @@ class TDTestCase:
'rowsPerTbl': 10000, 'rowsPerTbl': 10000,
'batchNum': 100, 'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3, 'pollDelay': 5,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 1}
...@@ -117,17 +117,16 @@ class TDTestCase: ...@@ -117,17 +117,16 @@ class TDTestCase:
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# after start consume, continue insert some data # after start consume, continue insert some data
paraDict['batchNum'] = 100 paraDict['batchNum'] = 100
paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# pInsertThread.join()
tdSql.query(queryString) tdSql.query(queryString)
expectRowsList.append(tdSql.getRows()) expectRowsList.append(tdSql.getRows())
...@@ -135,15 +134,16 @@ class TDTestCase: ...@@ -135,15 +134,16 @@ class TDTestCase:
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
if expectRowsList[0] != resultList[0]: if expectRowsList[0] != resultList[0]:
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)
tmqCom.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10) tdSql.query("flush database %s"%(paraDict['dbName']))
for i in range(len(topicNameList)): for i in range(len(topicNameList)):
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
tdSql.query("drop topic %s"%topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
...@@ -204,13 +204,12 @@ class TDTestCase: ...@@ -204,13 +204,12 @@ class TDTestCase:
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeRows = resultList[0]
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows):
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)
firstConsumeRows = resultList[0]
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
consumerId = 2 consumerId = 2
...@@ -224,15 +223,13 @@ class TDTestCase: ...@@ -224,15 +223,13 @@ class TDTestCase:
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = firstConsumeRows + resultList[0] actConsumeRows = resultList[0]
tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows): if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)):
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)): for i in range(len(topicNameList)):
tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
tdSql.query("drop topic %s"%topicNameList[i]) tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ") tdLog.printNoPrefix("======== test case 2 end ...... ")
...@@ -241,7 +238,7 @@ class TDTestCase: ...@@ -241,7 +238,7 @@ class TDTestCase:
tdSql.prepare() tdSql.prepare()
self.prepareTestEnv() self.prepareTestEnv()
self.tmqCase1() self.tmqCase1()
# self.tmqCase2() self.tmqCase2()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -151,41 +151,6 @@ class TDTestCase: ...@@ -151,41 +151,6 @@ class TDTestCase:
if not (totalConsumeRows == totalRowsFromQury): if not (totalConsumeRows == totalRowsFromQury):
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
# tdLog.info("****************************************************************************")
# tmqCom.initConsumerTable()
# consumerId = 1
# expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
# topicList = topicFromStb1
# ifcheckdata = 0
# ifManualCommit = 0
# keyList = 'group.id:cgrp2,\
# enable.auto.commit:true,\
# auto.commit.interval.ms:3000,\
# auto.offset.reset:earliest'
# tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
# tdLog.info("start consume processor")
# tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# expectRows = 1
# resultList = tmqCom.selectConsumeResult(expectRows)
# totalConsumeRows = 0
# for i in range(expectRows):
# totalConsumeRows += resultList[i]
# tdSql.query(queryString)
# totalRowsFromQury = tdSql.getRows()
# tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
# if not (totalConsumeRows == totalRowsFromQury):
# tdLog.exit("tmq consume rows error!")
# tdLog.info("****************************************************************************")
tmqCom.waitSubscriptionExit(tdSql, topicFromStb1) tmqCom.waitSubscriptionExit(tdSql, topicFromStb1)
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -259,7 +224,7 @@ class TDTestCase: ...@@ -259,7 +224,7 @@ class TDTestCase:
tdLog.info("create some new child table and insert data ") tdLog.info("create some new child table and insert data ")
paraDict["batchNum"] = 100 paraDict["batchNum"] = 100
paraDict["ctbPrefix"] = 'newCtb' paraDict["ctbPrefix"] = 'newCtb'
# tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"]) tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
......
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 4
self.ctbNum = 1000
self.rowsPerTbl = 10
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
# drop some ntbs
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ntb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 100,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0,
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdLog.info("start create database....")
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("start create normal tables....")
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("create topics from database")
topicFromDb = 'topic_dbt'
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
if self.snapshot == 0:
consumerId = 0
elif self.snapshot == 1:
consumerId = 1
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"])
topicList = topicFromDb
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("drop some ntables")
# drop 1/4 ctbls from half offset
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
tdLog.printNoPrefix("======== test case 1 end ...... ")
# drop some ntbs and create some new ntbs
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ntb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 100,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0,
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdLog.info("start create database....")
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("start create normal tables....")
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("create topics from database")
topicFromDb = 'topic_dbt'
tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName']))
if self.snapshot == 0:
consumerId = 2
elif self.snapshot == 1:
consumerId = 3
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
topicList = topicFromDb
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("drop some ntables")
# drop 1/4 ctbls from half offset
paraDict["ctbStartIdx"] = paraDict["ctbStartIdx"] + int(paraDict["ctbNum"] * 1 / 2)
paraDict["ctbNum"] = int(paraDict["ctbNum"] / 4)
tmqCom.drop_ctable(tdSql, dbname=paraDict['dbName'], count=paraDict["ctbNum"], default_ctbname_prefix=paraDict["ctbPrefix"], ctbStartIdx=paraDict["ctbStartIdx"])
tdLog.info("start create some new normal tables....")
paraDict["ctbPrefix"] = 'newCtb'
paraDict["ctbNum"] = self.ctbNum
tmqCom.create_ntable(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=paraDict["ctbNum"])
tdLog.info("start insert data into these new normal tables....")
tmqCom.insert_rows_into_ntbl(tsql=tdSql, dbname=paraDict["dbName"], tbname_prefix=paraDict["ctbPrefix"], tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"],startTs=paraDict["startTs"], tblNum=paraDict["ctbNum"], rows=paraDict["rowsPerTbl"])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
tmqCom.waitSubscriptionExit(tdSql, topicFromDb)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.snapshot = 0
self.tmqCase1()
self.tmqCase2()
# tdLog.printNoPrefix("====================================================================")
# tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
# self.snapshot = 1
# self.tmqCase1()
# self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -18,7 +18,7 @@ class TDTestCase: ...@@ -18,7 +18,7 @@ class TDTestCase:
def __init__(self): def __init__(self):
self.snapshot = 0 self.snapshot = 0
self.vgroups = 4 self.vgroups = 4
self.ctbNum = 100 self.ctbNum = 1000
self.rowsPerTbl = 10 self.rowsPerTbl = 10
def init(self, conn, logSql): def init(self, conn, logSql):
...@@ -39,9 +39,9 @@ class TDTestCase: ...@@ -39,9 +39,9 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ntb', 'ctbPrefix': 'ntb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 100, 'ctbNum': 1000,
'rowsPerTbl': 1000, 'rowsPerTbl': 100,
'batchNum': 1000, 'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0, 'endTs': 0,
'pollDelay': 5, 'pollDelay': 5,
...@@ -125,9 +125,9 @@ class TDTestCase: ...@@ -125,9 +125,9 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ntb', 'ctbPrefix': 'ntb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 100, 'ctbNum': 1000,
'rowsPerTbl': 1000, 'rowsPerTbl': 100,
'batchNum': 1000, 'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0, 'endTs': 0,
'pollDelay': 10, 'pollDelay': 10,
...@@ -203,16 +203,16 @@ class TDTestCase: ...@@ -203,16 +203,16 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 2 end ...... ") tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self): def run(self):
tdLog.printNoPrefix("=============================================") # tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") # tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.snapshot = 0 # self.snapshot = 0
# self.tmqCase1() # self.tmqCase1()
self.tmqCase2() # self.tmqCase2()
tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1 self.snapshot = 1
# self.tmqCase1() self.tmqCase1()
self.tmqCase2() self.tmqCase2()
def stop(self): def stop(self):
......
...@@ -210,7 +210,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py ...@@ -210,7 +210,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py
python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py
python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py
...@@ -219,12 +219,14 @@ python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py ...@@ -219,12 +219,14 @@ python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py
python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py
python3 ./test.py -f 7-tmq/tmqDropStb.py python3 ./test.py -f 7-tmq/tmqDropStb.py
python3 ./test.py -f 7-tmq/tmqDropStbCtb.py python3 ./test.py -f 7-tmq/tmqDropStbCtb.py
python3 ./test.py -f 7-tmq/tmqDropNtb.py python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot0.py
python3 ./test.py -f 7-tmq/tmqDropNtb-snapshot1.py
python3 ./test.py -f 7-tmq/tmqUdf.py python3 ./test.py -f 7-tmq/tmqUdf.py
python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
# python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py # python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
#------------querPolicy 2----------- #------------querPolicy 2-----------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册