提交 141ef5af 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode_refact1

...@@ -115,8 +115,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks); ...@@ -115,8 +115,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData); void colDataDestroy(SColumnInfoData* pColData);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols,
// sometimes info.numOfCols != array size
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
......
...@@ -165,15 +165,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t ...@@ -165,15 +165,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/ */
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
/**
* release the query handle and decrease the reference count in cache
* @param pMgmt
* @param pQInfo
* @param freeHandle
* @return
*/
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes);
......
...@@ -249,8 +249,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -249,8 +249,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
lastKey = rowKey; lastKey = rowKey;
++pCols->numOfRows; if (pCols) {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); ++pCols->numOfRows;
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
}
} else { } else {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
} }
...@@ -279,7 +281,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -279,7 +281,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
} }
#endif #endif
} }
if (lastKey != TSKEY_INITIAL_VAL) { if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
++pCols->numOfRows; ++pCols->numOfRows;
} }
......
...@@ -1638,7 +1638,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { ...@@ -1638,7 +1638,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno)); tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno));
return -1; return -1;
} }
tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb), tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb),
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid); vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid);
...@@ -2006,6 +2006,12 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, ...@@ -2006,6 +2006,12 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid,
int8_t level) { int8_t level) {
SArray *pResult = NULL; SArray *pResult = NULL;
if (!taskInfo) {
tsdbDebug("vgId:%d no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, REPO_ID(pTsdb), level, suid);
return TSDB_CODE_SUCCESS;
}
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo, tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo,
suid); suid);
...@@ -2071,10 +2077,18 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType ...@@ -2071,10 +2077,18 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType
tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid); tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pRSmaInfo->taskInfo[0]) {
tsdbDebug("vgId:%d no rsma qTaskInfo for suid:%" PRIu64, REPO_ID(pTsdb), suid);
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache // TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0);
if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED;
}
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1); tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1);
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2); tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2);
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
......
...@@ -25,7 +25,7 @@ const SVnodeCfg vnodeCfgDefault = { ...@@ -25,7 +25,7 @@ const SVnodeCfg vnodeCfgDefault = {
.isHeap = false, .isHeap = false,
.isWeak = 0, .isWeak = 0,
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI, .tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
.update = 0, .update = 1,
.compression = 2, .compression = 2,
.slLevel = 5, .slLevel = 5,
.days = 10, .days = 10,
......
...@@ -225,6 +225,7 @@ typedef struct SExecTaskInfo { ...@@ -225,6 +225,7 @@ typedef struct SExecTaskInfo {
char* sql; // query sql string char* sql; // query sql string
jmp_buf env; // jump to this position when error happens. jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SSubplan *plan;
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
} SExecTaskInfo; } SExecTaskInfo;
......
...@@ -36,21 +36,6 @@ typedef struct STaskMgmt { ...@@ -36,21 +36,6 @@ typedef struct STaskMgmt {
bool closed; bool closed;
} STaskMgmt; } STaskMgmt;
static void taskMgmtKillTaskFn(void* handle, void* param1) {
void** fp = (void**)handle;
qKillTask(*fp);
}
static void freeqinfoFn(void *qhandle) {
void** handle = qhandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillTask(*handle);
qDestroyTask(*handle);
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL); assert(readHandle != NULL && pSubplan != NULL);
......
...@@ -1225,6 +1225,8 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -1225,6 +1225,8 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
taosVariantDestroy(&pCtx[i].tag); taosVariantDestroy(&pCtx[i].tag);
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx); taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
taosMemoryFree(pCtx[i].input.pData);
taosMemoryFree(pCtx[i].input.pColumnDataAgg);
} }
taosMemoryFreeClear(pCtx); taosMemoryFreeClear(pCtx);
...@@ -2840,9 +2842,9 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray ...@@ -2840,9 +2842,9 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList) { SArray* pColList) {
blockDataEnsureCapacity(pRes, numOfRows);
if (pColList == NULL) { // data from other sources if (pColList == NULL) { // data from other sources
blockDataEnsureCapacity(pRes, numOfRows);
int32_t dataLen = *(int32_t*)pData; int32_t dataLen = *(int32_t*)pData;
pData += sizeof(int32_t); pData += sizeof(int32_t);
...@@ -2898,20 +2900,23 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -2898,20 +2900,23 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
SSDataBlock block = {.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)), .info.numOfCols = numOfCols}; SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {0}; SColumnInfoData idata = {0};
idata.info.type = pSchema[i].type; idata.info.type = pSchema[i].type;
idata.info.bytes = pSchema[i].bytes; idata.info.bytes = pSchema[i].bytes;
idata.info.colId = pSchema[i].colId; idata.info.colId = pSchema[i].colId;
taosArrayPush(block.pDataBlock, &idata); taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(idata.info.type)) { if (IS_VAR_DATA_TYPE(idata.info.type)) {
block.info.hasVarCol = true; pBlock->info.hasVarCol = true;
} }
} }
blockDataEnsureCapacity(&block, numOfRows); blockDataEnsureCapacity(pBlock, numOfRows);
int32_t dataLen = *(int32_t*)pStart; int32_t dataLen = *(int32_t*)pStart;
uint64_t groupId = *(uint64_t*)(pStart + sizeof(int32_t)); uint64_t groupId = *(uint64_t*)(pStart + sizeof(int32_t));
...@@ -2924,7 +2929,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -2924,7 +2929,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
colLen[i] = htonl(colLen[i]); colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] >= 0); ASSERT(colLen[i] >= 0);
SColumnInfoData* pColInfoData = taosArrayGet(block.pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colLen[i]; pColInfoData->varmeta.length = colLen[i];
pColInfoData->varmeta.allocLen = colLen[i]; pColInfoData->varmeta.allocLen = colLen[i];
...@@ -2943,7 +2948,10 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -2943,7 +2948,10 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
} }
// data from mnode // data from mnode
relocateColumnData(pRes, pColList, block.pDataBlock); relocateColumnData(pRes, pColList, pBlock->pDataBlock);
taosArrayDestroy(pBlock->pDataBlock);
taosMemoryFree(pBlock);
// blockDataDestroy(pBlock);
} }
pRes->info.rows = numOfRows; pRes->info.rows = numOfRows;
...@@ -4184,6 +4192,18 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { ...@@ -4184,6 +4192,18 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
pOperator->numOfDownstream = 0; pOperator->numOfDownstream = 0;
} }
if (pOperator->pExpr != NULL) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
SExprInfo* pExprInfo = &pOperator->pExpr[i];
if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) {
taosMemoryFree(pExprInfo->base.pParam[0].pCol);
}
taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr);
}
}
taosMemoryFree(pOperator->pExpr);
taosMemoryFreeClear(pOperator->info); taosMemoryFreeClear(pOperator->info);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
} }
...@@ -4195,8 +4215,6 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -4195,8 +4215,6 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
// pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
// pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ || if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ ||
pAggSup->pResultRowHashTable == NULL) { pAggSup->pResultRowHashTable == NULL) {
...@@ -4358,6 +4376,8 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -4358,6 +4376,8 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param; SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pPseudoColInfo);
} }
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -5189,6 +5209,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -5189,6 +5209,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
(*pTaskInfo)->plan = pPlan;
return code; return code;
_complete: _complete:
...@@ -5287,10 +5308,10 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { ...@@ -5287,10 +5308,10 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo); doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo);
destroyOperatorInfo(pTaskInfo->pRoot);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults); // taosHashCleanup(pTaskInfo->summary.operatorProfResults);
destroyOperatorInfo(pTaskInfo->pRoot);
taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo); taosMemoryFreeClear(pTaskInfo);
......
...@@ -670,6 +670,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { ...@@ -670,6 +670,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
} }
taosArrayDestroy(pInfo->scanCols);
} }
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
...@@ -1154,7 +1156,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe ...@@ -1154,7 +1156,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
NULL, NULL, NULL); NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
...@@ -1248,6 +1250,11 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1248,6 +1250,11 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SExprInfo* pExprInfo = &pOperator->pExpr[0]; SExprInfo* pExprInfo = &pOperator->pExpr[0];
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
if (taosArrayGetSize(pInfo->pTableGroups->pGroupList) == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0); SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0);
char str[512] = {0}; char str[512] = {0};
......
...@@ -181,7 +181,16 @@ class TDTestCase: ...@@ -181,7 +181,16 @@ class TDTestCase:
tdSql.error("select timediff(10,1,1.5) from stb") tdSql.error("select timediff(10,1,1.5) from stb")
# tdSql.error("select timediff(10,1,2s) from stb") # tdSql.error("select timediff(10,1,2s) from stb")
# tdSql.error("select timedifff(10,1,c1) from stb") # tdSql.error("select timedifff(10,1,c1) from stb")
tdSql.error("select timediff(1.5,1.5) from stb_1")
tdSql.error("select timediff(1) from stb_1")
tdSql.error("select timediff(10,1,1.5) from stb_1")
# tdSql.error("select timediff(10,1,2s) from stb_1")
# tdSql.error("select timedifff(10,1,c1) from stb_1")
tdSql.error("select timediff(1.5,1.5) from ntb")
tdSql.error("select timediff(1) from ntb")
tdSql.error("select timediff(10,1,1.5) from ntb")
# tdSql.error("select timediff(10,1,2s) from ntb")
# tdSql.error("select timedifff(10,1,c1) from ntb")
......
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.rowNum = 10
self.ts = 1537146000000
self.perfix = 'dev'
self.tables = 10
def insertData(self):
print("==============step1")
tdSql.execute(
"create table if not exists st (ts timestamp, col int) tags(dev nchar(50))")
for i in range(self.tables):
tdSql.execute("create table %s%d using st tags(%d)" % (self.perfix, i, i))
rows = 15 + i
for j in range(rows):
tdSql.execute("insert into %s%d values(%d, %d)" %(self.perfix, i, self.ts + i * 20 * 10000 + j * 10000, j))
def run(self):
tdSql.prepare()
tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''')
tdSql.execute("create table stb_1 using stb tags('beijing')")
tdSql.execute("insert into stb_1 values(%d, 0, 0, 0, 0, 0.0, 0.0, False, ' ', ' ', 0, 0, 0, 0)" % (self.ts - 1))
# diff verifacation
tdSql.query("select diff(col1) from stb_1")
tdSql.checkRows(0)
tdSql.query("select diff(col2) from stb_1")
tdSql.checkRows(0)
tdSql.query("select diff(col3) from stb_1")
tdSql.checkRows(0)
tdSql.query("select diff(col4) from stb_1")
tdSql.checkRows(0)
tdSql.query("select diff(col5) from stb_1")
tdSql.checkRows(0)
tdSql.query("select diff(col6) from stb_1")
tdSql.checkRows(0)
for i in range(self.rowNum):
tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
tdSql.error("select diff(ts) from stb")
tdSql.error("select diff(ts) from stb_1")
tdSql.error("select diff(col1) from stb")
tdSql.error("select diff(col2) from stb")
tdSql.error("select diff(col3) from stb")
tdSql.error("select diff(col4) from stb")
tdSql.error("select diff(col5) from stb")
tdSql.error("select diff(col6) from stb")
tdSql.error("select diff(col7) from stb")
tdSql.error("select diff(col7) from stb_1")
tdSql.error("select diff(col8) from stb")
tdSql.error("select diff(col8) from stb_1")
tdSql.error("select diff(col9) from stb")
tdSql.error("select diff(col9) from stb_1")
tdSql.error("select diff(col11) from stb_1")
tdSql.error("select diff(col12) from stb_1")
tdSql.error("select diff(col13) from stb_1")
tdSql.error("select diff(col14) from stb_1")
tdSql.error("select diff(col11) from stb")
tdSql.error("select diff(col12) from stb")
tdSql.error("select diff(col13) from stb")
tdSql.error("select diff(col14) from stb")
tdSql.query("select ts,diff(col1),ts from stb_1")
tdSql.checkRows(10)
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
tdSql.query("select ts,diff(col1),ts from stb group by tbname")
tdSql.checkRows(10)
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
tdSql.query("select ts,diff(col1),ts from stb_1")
tdSql.checkRows(10)
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
tdSql.query("select ts,diff(col1),ts from stb group by tbname")
tdSql.checkRows(10)
tdSql.checkData(0, 0, "2018-09-17 09:00:00.000")
tdSql.checkData(0, 1, "2018-09-17 09:00:00.000")
tdSql.checkData(0, 3, "2018-09-17 09:00:00.000")
tdSql.checkData(9, 0, "2018-09-17 09:00:00.009")
tdSql.checkData(9, 1, "2018-09-17 09:00:00.009")
tdSql.checkData(9, 3, "2018-09-17 09:00:00.009")
tdSql.query("select diff(col1) from stb_1")
tdSql.checkRows(10)
tdSql.query("select diff(col2) from stb_1")
tdSql.checkRows(10)
tdSql.query("select diff(col3) from stb_1")
tdSql.checkRows(10)
tdSql.query("select diff(col4) from stb_1")
tdSql.checkRows(10)
tdSql.query("select diff(col5) from stb_1")
tdSql.checkRows(10)
tdSql.query("select diff(col6) from stb_1")
tdSql.checkRows(10)
self.insertData()
tdSql.query("select diff(col) from st group by tbname")
tdSql.checkRows(185)
tdSql.error("select diff(col) from st group by dev")
tdSql.error("select diff(col) from st group by col")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -21,12 +21,12 @@ class TDTestCase: ...@@ -21,12 +21,12 @@ class TDTestCase:
tdSql.execute("insert into stb_1(ts) values(%d)" % (self.ts - 1)) tdSql.execute("insert into stb_1(ts) values(%d)" % (self.ts - 1))
# last verifacation # last verifacation
# tdSql.query("select last(*) from stb_1") tdSql.query("select last(*) from stb_1")
# tdSql.checkRows(1) tdSql.checkRows(1)
# tdSql.checkData(0, 1, None) tdSql.checkData(0, 1, None)
# tdSql.query("select last(*) from db.stb_1") tdSql.query("select last(*) from db.stb_1")
# tdSql.checkRows(1) tdSql.checkRows(1)
# tdSql.checkData(0, 1, None) tdSql.checkData(0, 1, None)
tdSql.query("select last(col1) from stb_1") tdSql.query("select last(col1) from stb_1")
tdSql.checkRows(0) tdSql.checkRows(0)
tdSql.query("select last(col1) from db.stb_1") tdSql.query("select last(col1) from db.stb_1")
...@@ -86,12 +86,12 @@ class TDTestCase: ...@@ -86,12 +86,12 @@ class TDTestCase:
tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) % (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
# tdSql.query("select last(*) from stb_1") tdSql.query("select last(*) from stb_1")
# tdSql.checkRows(1) tdSql.checkRows(1)
# tdSql.checkData(0, 1, 10) tdSql.checkData(0, 1, 10)
# tdSql.query("select last(*) from db.stb_1") tdSql.query("select last(*) from db.stb_1")
# tdSql.checkRows(1) tdSql.checkRows(1)
# tdSql.checkData(0, 1, 10) tdSql.checkData(0, 1, 10)
tdSql.query("select last(col1) from stb_1") tdSql.query("select last(col1) from stb_1")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.checkData(0, 0, 10) tdSql.checkData(0, 0, 10)
...@@ -175,12 +175,12 @@ class TDTestCase: ...@@ -175,12 +175,12 @@ class TDTestCase:
tdSql.execute('''create table ntb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, tdSql.execute('''create table ntb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
tdSql.execute("insert into ntb(ts) values(%d)" % (self.ts - 1)) tdSql.execute("insert into ntb(ts) values(%d)" % (self.ts - 1))
# tdSql.query("select last(*) from ntb") tdSql.query("select last(*) from ntb")
# tdSql.checkRows(1) tdSql.checkRows(1)
# tdSql.checkData(0, 1, None) tdSql.checkData(0, 1, None)
# tdSql.query("select last(*) from db.ntb") tdSql.query("select last(*) from db.ntb")
# tdSql.checkRows(1) tdSql.checkRows(1)
# tdSql.checkData(0, 1, None) tdSql.checkData(0, 1, None)
tdSql.query("select last(col1) from ntb") tdSql.query("select last(col1) from ntb")
tdSql.checkRows(0) tdSql.checkRows(0)
tdSql.query("select last(col1) from db.ntb") tdSql.query("select last(col1) from db.ntb")
...@@ -316,12 +316,12 @@ class TDTestCase: ...@@ -316,12 +316,12 @@ class TDTestCase:
tdSql.query("select last(col8) from db.ntb") tdSql.query("select last(col8) from db.ntb")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.checkData(0, 0, 'taosdata10') tdSql.checkData(0, 0, 'taosdata10')
# tdSql.query("select last(col9) from ntb") tdSql.query("select last(col9) from ntb")
# tdSql.checkRows(1) tdSql.checkRows(1)
# tdSql.checkData(0, 0, '涛思数据10') tdSql.checkData(0, 0, '涛思数据10')
# tdSql.query("select last(col9) from db.ntb") tdSql.query("select last(col9) from db.ntb")
# tdSql.checkRows(1) tdSql.checkRows(1)
# tdSql.checkData(0, 0, '涛思数据10') tdSql.checkData(0, 0, '涛思数据10')
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -23,7 +23,8 @@ python3 ./test.py -f 2-query/last.py ...@@ -23,7 +23,8 @@ python3 ./test.py -f 2-query/last.py
python3 ./test.py -f 2-query/To_unixtimestamp.py python3 ./test.py -f 2-query/To_unixtimestamp.py
python3 ./test.py -f 2-query/timetruncate.py python3 ./test.py -f 2-query/timetruncate.py
# python3 ./test.py -f 2-query/Timediff.py python3 ./test.py -f 2-query/Timediff.py
# python3 ./test.py -f 2-query/diff.py
#python3 ./test.py -f 2-query/cast.py #python3 ./test.py -f 2-query/cast.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册