提交 134ad9fa 编写于 作者: G Ganlin Zhao

Merge branch '3.0' into enh/elapsed_split

...@@ -146,6 +146,7 @@ typedef struct SqlFunctionCtx { ...@@ -146,6 +146,7 @@ typedef struct SqlFunctionCtx {
struct SDiskbasedBuf *pBuf; struct SDiskbasedBuf *pBuf;
struct SSDataBlock *pSrcBlock; struct SSDataBlock *pSrcBlock;
int32_t curBufPage; int32_t curBufPage;
bool increase;
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx; } SqlFunctionCtx;
......
...@@ -1752,7 +1752,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1752,7 +1752,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
taosArrayClear(tagArray); taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
if (!pTag) { if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq); tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
return NULL; return NULL;
...@@ -1763,9 +1763,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1763,9 +1763,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
tdDestroySVCreateTbReq(&createTbReq); tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) { if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
return NULL; return NULL;
} }
...@@ -1804,8 +1802,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1804,8 +1802,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t schemaLen = 0; int32_t schemaLen = 0;
if (createTb) { if (createTb) {
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname; createTbReq.name = cname;
createTbReq.flags = 0; createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE; createTbReq.type = TSDB_CHILD_TABLE;
...@@ -1819,7 +1816,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1819,7 +1816,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
taosArrayPush(tagArray, &tagVal); taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL; STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
if (!pTag) { if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq); tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret); taosMemoryFreeClear(ret);
...@@ -1945,7 +1942,6 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen ...@@ -1945,7 +1942,6 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) { const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
blockDataEnsureCapacity(pBlock, numOfRows); blockDataEnsureCapacity(pBlock, numOfRows);
pBlock->info.rows = numOfRows;
const char* pStart = pData; const char* pStart = pData;
...@@ -2019,6 +2015,7 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t ...@@ -2019,6 +2015,7 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
pStart += colLen[i]; pStart += colLen[i];
} }
pBlock->info.rows = numOfRows;
ASSERT(pStart - pData == dataLen); ASSERT(pStart - pData == dataLen);
return pStart; return pStart;
} }
...@@ -186,7 +186,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p ...@@ -186,7 +186,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
SVgObj* pVgroup; SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) { if (strcmp(pVgroup->dbName, pStream->targetDb) != 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
continue; continue;
} }
...@@ -286,7 +286,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -286,7 +286,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
bool hasExtraSink = false; bool hasExtraSink = false;
if (totLevel == 2 || strcmp(pStream->sourceDb, pStream->targetDb) != 0) { bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
if (totLevel == 2 || externalTargetDB) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink // add extra sink
...@@ -405,7 +406,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -405,7 +406,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE; pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDb); ASSERT(pDb);
...@@ -426,10 +426,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -426,10 +426,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) { for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
/*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
if (pLastLevelTask->nodeId == pVgInfo->vgId) { if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId; pVgInfo->taskId = pLastLevelTask->taskId;
/*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
break; break;
} }
} }
......
...@@ -804,7 +804,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t ...@@ -804,7 +804,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
...@@ -892,8 +892,8 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI ...@@ -892,8 +892,8 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey,
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey); SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex); SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
......
...@@ -1130,6 +1130,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, ...@@ -1130,6 +1130,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->start.key = INT64_MIN; pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN; pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pExpr->base.numOfParams; pCtx->numOfParams = pExpr->base.numOfParams;
pCtx->increase = false;
pCtx->param = pFunct->pParam; pCtx->param = pFunct->pParam;
// for (int32_t j = 0; j < pCtx->numOfParams; ++j) { // for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
...@@ -2008,8 +2009,16 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI ...@@ -2008,8 +2009,16 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) { if (pCtx[j].increase) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); int64_t ts = *(int64_t*) in;
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, (const char *)&ts, pCtx[j].resultInfo->isNullRes);
ts++;
}
} else {
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
} }
} }
} }
...@@ -4676,7 +4685,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4676,7 +4685,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo); bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 8; int32_t children = 8;
...@@ -5339,7 +5349,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo ...@@ -5339,7 +5349,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size_t size) { int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t size) {
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
pSup->pResultRows = taosArrayInit(1024, size); pSup->pResultRows = taosArrayInit(1024, size);
...@@ -5358,15 +5369,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size ...@@ -5358,15 +5369,11 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size
if (bufSize <= pageSize) { if (bufSize <= pageSize) {
bufSize = pageSize * 4; bufSize = pageSize * 4;
} }
return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH); int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
} for(int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].pBuf = pSup->pResultBuf;
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey) { }
return initStreamAggSupporter(pSup, pKey, sizeof(SResultWindowInfo)); return code;
}
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
return initStreamAggSupporter(pSup, pKey, sizeof(SStateWindowInfo));
} }
int64_t getSmaWaterMark(int64_t interval, double filesFactor) { int64_t getSmaWaterMark(int64_t interval, double filesFactor) {
......
...@@ -1438,9 +1438,15 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt ...@@ -1438,9 +1438,15 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
return needed; return needed;
} }
void increaseTs(SqlFunctionCtx* pCtx) {
if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTARTTS) {
pCtx[0].increase = true;
}
}
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -1460,6 +1466,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1460,6 +1466,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
int32_t code = int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
if (isStream) {
ASSERT(numOfCols > 0);
increaseTs(pInfo->binfo.pCtx);
}
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
...@@ -2128,6 +2139,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2128,6 +2139,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
pResBlock, keyBufSize, pTaskInfo->id.str); pResBlock, keyBufSize, pTaskInfo->id.str);
ASSERT(numOfCols > 0);
increaseTs(pInfo->binfo.pCtx);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -2212,6 +2225,8 @@ int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, ...@@ -2212,6 +2225,8 @@ int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pBasicInfo->pCtx[i].pBuf = NULL; pBasicInfo->pCtx[i].pBuf = NULL;
} }
ASSERT(numOfCols > 0);
increaseTs(pBasicInfo->pCtx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2228,6 +2243,10 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int ...@@ -2228,6 +2243,10 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
} }
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo));
}
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
...@@ -2244,8 +2263,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx ...@@ -2244,8 +2263,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo", pInfo->binfo.pCtx, numOfCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -3097,6 +3116,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3097,6 +3116,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SStateWindowInfo));
}
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode; SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
...@@ -3130,8 +3153,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -3130,8 +3153,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
goto _error; goto _error;
} }
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols); code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo", pInfo->binfo.pCtx, numOfCols);
code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo");
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -102,6 +102,8 @@ bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); ...@@ -102,6 +102,8 @@ bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
......
...@@ -1103,6 +1103,22 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { ...@@ -1103,6 +1103,22 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
* *
*/ */
static bool validateHourRange(int8_t hour) {
if (hour < 0 || hour > 12) {
return false;
}
return true;
}
static bool validateMinuteRange(int8_t hour, int8_t minute, char sign) {
if (minute == 0 || (minute == 30 && (hour == 3 || hour == 5) && sign == '-')) {
return true;
}
return false;
}
static bool validateTimezoneFormat(const SValueNode* pVal) { static bool validateTimezoneFormat(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) { if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false; return false;
...@@ -1111,6 +1127,8 @@ static bool validateTimezoneFormat(const SValueNode* pVal) { ...@@ -1111,6 +1127,8 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
char* tz = varDataVal(pVal->datum.p); char* tz = varDataVal(pVal->datum.p);
int32_t len = varDataLen(pVal->datum.p); int32_t len = varDataLen(pVal->datum.p);
char buf[3] = {0};
int8_t hour = -1, minute = -1;
if (len == 0) { if (len == 0) {
return false; return false;
} else if (len == 1 && (tz[0] == 'z' || tz[0] == 'Z')) { } else if (len == 1 && (tz[0] == 'z' || tz[0] == 'Z')) {
...@@ -1123,6 +1141,20 @@ static bool validateTimezoneFormat(const SValueNode* pVal) { ...@@ -1123,6 +1141,20 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
if (!isdigit(tz[i])) { if (!isdigit(tz[i])) {
return false; return false;
} }
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 4) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
} }
break; break;
} }
...@@ -1134,9 +1166,24 @@ static bool validateTimezoneFormat(const SValueNode* pVal) { ...@@ -1134,9 +1166,24 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
} }
continue; continue;
} }
if (!isdigit(tz[i])) { if (!isdigit(tz[i])) {
return false; return false;
} }
if (i == 2) {
memcpy(buf, &tz[i - 1], 2);
hour = taosStr2Int8(buf, NULL, 10);
if (!validateHourRange(hour)) {
return false;
}
} else if (i == 5) {
memcpy(buf, &tz[i - 1], 2);
minute = taosStr2Int8(buf, NULL, 10);
if (!validateMinuteRange(hour, minute, tz[0])) {
return false;
}
}
} }
break; break;
} }
...@@ -1422,6 +1469,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1422,6 +1469,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = topFunction, .processFunc = topFunction,
.finalizeFunc = topBotFinalize, .finalizeFunc = topBotFinalize,
.combineFunc = topCombine,
}, },
{ {
.name = "bottom", .name = "bottom",
...@@ -1431,7 +1479,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1431,7 +1479,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getTopBotFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = bottomFunction, .processFunc = bottomFunction,
.finalizeFunc = topBotFinalize .finalizeFunc = topBotFinalize,
.combineFunc = bottomCombine,
}, },
{ {
.name = "spread", .name = "spread",
......
...@@ -1389,6 +1389,18 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple ...@@ -1389,6 +1389,18 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
} }
} }
void releaseSource(STuplePos* pPos) {
if (pPos->pageId == -1) {
return ;
}
// Todo(liuyao) relase row
}
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
releaseSource(pDestPos);
*pDestPos = *pSourcePos;
}
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) { int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
...@@ -1400,10 +1412,12 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 ...@@ -1400,10 +1412,12 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
if (pSBuf->assign && if (pSBuf->assign &&
( (((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign ) ) { ( (((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign ) ) {
*(double*) &pDBuf->v = *(double*) &pSBuf->v; *(double*) &pDBuf->v = *(double*) &pSBuf->v;
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
} }
} else { } else {
if ( pSBuf->assign && ( ((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign ) ) { if ( pSBuf->assign && ( ((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign ) ) {
pDBuf->v = pSBuf->v; pDBuf->v = pSBuf->v;
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
} }
} }
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
...@@ -2856,7 +2870,6 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS ...@@ -2856,7 +2870,6 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
pEntryInfo->complete = true;
int32_t type = pCtx->input.pData[0]->info.type; int32_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
...@@ -2881,6 +2894,67 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2881,6 +2894,67 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pEntryInfo->numOfRes; return pEntryInfo->numOfRes;
} }
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
bool isTopQuery) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
STopBotResItem* pItems = pRes->pItems;
assert(pItems != NULL);
// not full yet
if (pEntryInfo->numOfRes < maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = pSourceItem->v;
pItem->uid = pSourceItem->uid;
pItem->tuplePos.pageId = -1;
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
pEntryInfo->numOfRes++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
!isTopQuery);
} else { // replace the minimum value in the result
if ((isTopQuery && (
(IS_SIGNED_NUMERIC_TYPE(type) && pSourceItem->v.i > pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && pSourceItem->v.u > pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && pSourceItem->v.d > pItems[0].v.d)))
|| (!isTopQuery && (
(IS_SIGNED_NUMERIC_TYPE(type) && pSourceItem->v.i < pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && pSourceItem->v.u < pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && pSourceItem->v.d < pItems[0].v.d))
)) {
// replace the old data and the coresponding tuple data
STopBotResItem* pItem = &pItems[0];
pItem->v = pSourceItem->v;
pItem->uid = pSourceItem->uid;
// save the data of this tuple by over writing the old data
replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos);
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
topBotResComparFn, NULL, !isTopQuery);
}
}
}
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
addResult(pDestCtx, pSBuf->pItems + i, type, true);
}
return TSDB_CODE_SUCCESS;
}
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
addResult(pDestCtx, pSBuf->pItems + i, type, false);
}
return TSDB_CODE_SUCCESS;
}
bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSpreadInfo); pEnv->calcMemSize = sizeof(SSpreadInfo);
return true; return true;
......
...@@ -348,7 +348,7 @@ static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIn ...@@ -348,7 +348,7 @@ static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIn
int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
char* t = taosMemoryCalloc(1, outputMaxLen); char* t = taosMemoryCalloc(1, outputMaxLen);
/*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), outputMaxLen, &len); /*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), outputMaxLen - VARSTR_HEADER_SIZE, &len);
varDataSetLen(t, len); varDataSetLen(t, len);
colDataAppend(pOut->columnData, rowIndex, t, false); colDataAppend(pOut->columnData, rowIndex, t, false);
......
...@@ -253,7 +253,9 @@ int walRoll(SWal *pWal) { ...@@ -253,7 +253,9 @@ int walRoll(SWal *pWal) {
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalIdxEntry entry = {.ver = ver, .offset = offset};
int size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry)); /*int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_CUR);*/
/*wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);*/
int size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) { if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
// TODO truncate // TODO truncate
......
...@@ -174,9 +174,9 @@ int32_t taosRenameFile(const char *oldName, const char *newName) { ...@@ -174,9 +174,9 @@ int32_t taosRenameFile(const char *oldName, const char *newName) {
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) { int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
struct stat fileStat; struct stat fileStat;
#ifdef WINDOWS #ifdef WINDOWS
int32_t code = _stat(path, &fileStat); int32_t code = _stat(path, &fileStat);
#else #else
int32_t code = stat(path, &fileStat); int32_t code = stat(path, &fileStat);
#endif #endif
if (code < 0) { if (code < 0) {
return code; return code;
...@@ -201,7 +201,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) { ...@@ -201,7 +201,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
#ifdef WINDOWS #ifdef WINDOWS
BY_HANDLE_FILE_INFORMATION bhfi; BY_HANDLE_FILE_INFORMATION bhfi;
HANDLE handle = (HANDLE)_get_osfhandle(pFile->fd); HANDLE handle = (HANDLE)_get_osfhandle(pFile->fd);
if (GetFileInformationByHandle(handle, &bhfi) == FALSE) { if (GetFileInformationByHandle(handle, &bhfi) == FALSE) {
printf("taosFStatFile get file info fail."); printf("taosFStatFile get file info fail.");
return -1; return -1;
...@@ -216,7 +216,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) { ...@@ -216,7 +216,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
} }
#else #else
struct stat fileStat; struct stat fileStat;
int32_t code = fstat(pFile->fd, &fileStat); int32_t code = fstat(pFile->fd, &fileStat);
if (code < 0) { if (code < 0) {
...@@ -238,7 +238,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) { ...@@ -238,7 +238,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
void autoDelFileListAdd(const char *path) { return; } void autoDelFileListAdd(const char *path) { return; }
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
int fd = -1; int fd = -1;
FILE *fp = NULL; FILE *fp = NULL;
if (tdFileOptions & TD_FILE_STREAM) { if (tdFileOptions & TD_FILE_STREAM) {
...@@ -316,12 +316,12 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { ...@@ -316,12 +316,12 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
(*ppFile)->fp = NULL; (*ppFile)->fp = NULL;
} }
if ((*ppFile)->fd >= 0) { if ((*ppFile)->fd >= 0) {
#ifdef WINDOWS #ifdef WINDOWS
HANDLE h = (HANDLE)_get_osfhandle((*ppFile)->fd); HANDLE h = (HANDLE)_get_osfhandle((*ppFile)->fd);
!FlushFileBuffers(h); !FlushFileBuffers(h);
#else #else
fsync((*ppFile)->fd); fsync((*ppFile)->fd);
#endif #endif
close((*ppFile)->fd); close((*ppFile)->fd);
(*ppFile)->fd = -1; (*ppFile)->fd = -1;
} }
...@@ -345,11 +345,11 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { ...@@ -345,11 +345,11 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
char *tbuf = (char *)buf; char *tbuf = (char *)buf;
while (leftbytes > 0) { while (leftbytes > 0) {
#ifdef WINDOWS #ifdef WINDOWS
readbytes = _read(pFile->fd, (void *)tbuf, (uint32_t)leftbytes); readbytes = _read(pFile->fd, (void *)tbuf, (uint32_t)leftbytes);
#else #else
readbytes = read(pFile->fd, (void *)tbuf, (uint32_t)leftbytes); readbytes = read(pFile->fd, (void *)tbuf, (uint32_t)leftbytes);
#endif #endif
if (readbytes < 0) { if (readbytes < 0) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
...@@ -433,9 +433,6 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { ...@@ -433,9 +433,6 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
} }
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock)); taosThreadRwlockRdlock(&(pFile->rwlock));
#endif #endif
...@@ -459,9 +456,9 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { ...@@ -459,9 +456,9 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
struct stat fileStat; struct stat fileStat;
#ifdef WINDOWS #ifdef WINDOWS
int32_t code = _fstat(pFile->fd, &fileStat); int32_t code = _fstat(pFile->fd, &fileStat);
#else #else
int32_t code = fstat(pFile->fd, &fileStat); int32_t code = fstat(pFile->fd, &fileStat);
#endif #endif
if (code < 0) { if (code < 0) {
return code; return code;
...@@ -565,12 +562,12 @@ int32_t taosFsyncFile(TdFilePtr pFile) { ...@@ -565,12 +562,12 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
if (pFile->fp != NULL) return fflush(pFile->fp); if (pFile->fp != NULL) return fflush(pFile->fp);
if (pFile->fd >= 0) { if (pFile->fd >= 0) {
#ifdef WINDOWS #ifdef WINDOWS
HANDLE h = (HANDLE)_get_osfhandle(pFile->fd); HANDLE h = (HANDLE)_get_osfhandle(pFile->fd);
return !FlushFileBuffers(h); return !FlushFileBuffers(h);
#else #else
return fsync(pFile->fd); return fsync(pFile->fd);
#endif #endif
} }
return 0; return 0;
} }
......
...@@ -28,92 +28,48 @@ ...@@ -28,92 +28,48 @@
#ifdef WINDOWS #ifdef WINDOWS
#include <time.h> #include <stdlib.h>
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h>
//#define TM_YEAR_BASE 1970 //origin //#define TM_YEAR_BASE 1970 //origin
#define TM_YEAR_BASE 1900 //slguan #define TM_YEAR_BASE 1900 // slguan
/* /*
* We do not implement alternate representations. However, we always * We do not implement alternate representations. However, we always
* check whether a given modifier is allowed for a certain conversion. * check whether a given modifier is allowed for a certain conversion.
*/ */
#define ALT_E 0x01 #define ALT_E 0x01
#define ALT_O 0x02 #define ALT_O 0x02
#define LEGAL_ALT(x) { if (alt_format & ~(x)) return (0); } #define LEGAL_ALT(x) \
{ \
if (alt_format & ~(x)) return (0); \
}
static int conv_num(const char **buf, int *dest, int llim, int ulim) static int conv_num(const char **buf, int *dest, int llim, int ulim) {
{ int result = 0;
int result = 0;
/* The limit also determines the number of valid digits. */ /* The limit also determines the number of valid digits. */
int rulim = ulim; int rulim = ulim;
if (**buf < '0' || **buf > '9') if (**buf < '0' || **buf > '9') return (0);
return (0);
do { do {
result *= 10; result *= 10;
result += *(*buf)++ - '0'; result += *(*buf)++ - '0';
rulim /= 10; rulim /= 10;
} while ((result * 10 <= ulim) && rulim && **buf >= '0' && **buf <= '9'); } while ((result * 10 <= ulim) && rulim && **buf >= '0' && **buf <= '9');
if (result < llim || result > ulim) if (result < llim || result > ulim) return (0);
return (0);
*dest = result; *dest = result;
return (1); return (1);
} }
static const char *day[7] = { static const char *day[7] = {"Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"};
"Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", static const char *abday[7] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"};
"Friday", "Saturday" static const char *mon[12] = {"January", "February", "March", "April", "May", "June",
}; "July", "August", "September", "October", "November", "December"};
static const char *abday[7] = { static const char *abmon[12] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"};
"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" static const char *am_pm[2] = {"AM", "PM"};
};
static const char *mon[12] = {
"January", "February", "March", "April", "May", "June", "July",
"August", "September", "October", "November", "December"
};
static const char *abmon[12] = {
"Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"
};
static const char *am_pm[2] = {
"AM", "PM"
};
#define BILLION (1E9)
static BOOL g_first_time = 1;
static LARGE_INTEGER g_counts_per_sec;
int clock_gettime(int dummy, struct timespec *ct)
{
LARGE_INTEGER count;
if (g_first_time)
{
g_first_time = 0;
if (0 == QueryPerformanceFrequency(&g_counts_per_sec))
{
g_counts_per_sec.QuadPart = 0;
}
}
if ((NULL == ct) || (g_counts_per_sec.QuadPart <= 0) ||
(0 == QueryPerformanceCounter(&count)))
{
return -1;
}
ct->tv_sec = count.QuadPart / g_counts_per_sec.QuadPart;
ct->tv_nsec = ((count.QuadPart % g_counts_per_sec.QuadPart) * BILLION) / g_counts_per_sec.QuadPart;
return 0;
}
#else #else
#include <sys/time.h> #include <sys/time.h>
...@@ -121,301 +77,265 @@ int clock_gettime(int dummy, struct timespec *ct) ...@@ -121,301 +77,265 @@ int clock_gettime(int dummy, struct timespec *ct)
char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm) { char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm) {
#ifdef WINDOWS #ifdef WINDOWS
char c; char c;
const char *bp; const char *bp;
size_t len = 0; size_t len = 0;
int alt_format, i, split_year = 0; int alt_format, i, split_year = 0;
bp = buf; bp = buf;
while ((c = *fmt) != '\0') { while ((c = *fmt) != '\0') {
/* Clear `alternate' modifier prior to new conversion. */ /* Clear `alternate' modifier prior to new conversion. */
alt_format = 0; alt_format = 0;
/* Eat up white-space. */ /* Eat up white-space. */
if (isspace(c)) { if (isspace(c)) {
while (isspace(*bp)) while (isspace(*bp)) bp++;
bp++;
fmt++; fmt++;
continue; continue;
} }
if ((c = *fmt++) != '%')
goto literal;
if ((c = *fmt++) != '%') goto literal;
again: switch (c = *fmt++) { again:
case '%': /* "%%" is converted to "%". */ switch (c = *fmt++) {
literal : case '%': /* "%%" is converted to "%". */
if (c != *bp++) literal:
return (0); if (c != *bp++) return (0);
break; break;
/* /*
* "Alternative" modifiers. Just set the appropriate flag * "Alternative" modifiers. Just set the appropriate flag
* and start over again. * and start over again.
*/ */
case 'E': /* "%E?" alternative conversion modifier. */ case 'E': /* "%E?" alternative conversion modifier. */
LEGAL_ALT(0); LEGAL_ALT(0);
alt_format |= ALT_E; alt_format |= ALT_E;
goto again; goto again;
case 'O': /* "%O?" alternative conversion modifier. */ case 'O': /* "%O?" alternative conversion modifier. */
LEGAL_ALT(0); LEGAL_ALT(0);
alt_format |= ALT_O; alt_format |= ALT_O;
goto again; goto again;
/* /*
* "Complex" conversion rules, implemented through recursion. * "Complex" conversion rules, implemented through recursion.
*/ */
case 'c': /* Date and time, using the locale's format. */ case 'c': /* Date and time, using the locale's format. */
LEGAL_ALT(ALT_E); LEGAL_ALT(ALT_E);
if (!(bp = taosStrpTime(bp, "%x %X", tm))) if (!(bp = taosStrpTime(bp, "%x %X", tm))) return (0);
return (0);
break; break;
case 'D': /* The date as "%m/%d/%y". */ case 'D': /* The date as "%m/%d/%y". */
LEGAL_ALT(0); LEGAL_ALT(0);
if (!(bp = taosStrpTime(bp, "%m/%d/%y", tm))) if (!(bp = taosStrpTime(bp, "%m/%d/%y", tm))) return (0);
return (0);
break; break;
case 'R': /* The time as "%H:%M". */ case 'R': /* The time as "%H:%M". */
LEGAL_ALT(0); LEGAL_ALT(0);
if (!(bp = taosStrpTime(bp, "%H:%M", tm))) if (!(bp = taosStrpTime(bp, "%H:%M", tm))) return (0);
return (0);
break; break;
case 'r': /* The time in 12-hour clock representation. */ case 'r': /* The time in 12-hour clock representation. */
LEGAL_ALT(0); LEGAL_ALT(0);
if (!(bp = taosStrpTime(bp, "%I:%M:%S %p", tm))) if (!(bp = taosStrpTime(bp, "%I:%M:%S %p", tm))) return (0);
return (0);
break; break;
case 'T': /* The time as "%H:%M:%S". */ case 'T': /* The time as "%H:%M:%S". */
LEGAL_ALT(0); LEGAL_ALT(0);
if (!(bp = taosStrpTime(bp, "%H:%M:%S", tm))) if (!(bp = taosStrpTime(bp, "%H:%M:%S", tm))) return (0);
return (0);
break; break;
case 'X': /* The time, using the locale's format. */ case 'X': /* The time, using the locale's format. */
LEGAL_ALT(ALT_E); LEGAL_ALT(ALT_E);
if (!(bp = taosStrpTime(bp, "%H:%M:%S", tm))) if (!(bp = taosStrpTime(bp, "%H:%M:%S", tm))) return (0);
return (0);
break; break;
case 'x': /* The date, using the locale's format. */ case 'x': /* The date, using the locale's format. */
LEGAL_ALT(ALT_E); LEGAL_ALT(ALT_E);
if (!(bp = taosStrpTime(bp, "%m/%d/%y", tm))) if (!(bp = taosStrpTime(bp, "%m/%d/%y", tm))) return (0);
return (0);
break; break;
/* /*
* "Elementary" conversion rules. * "Elementary" conversion rules.
*/ */
case 'A': /* The day of week, using the locale's form. */ case 'A': /* The day of week, using the locale's form. */
case 'a': case 'a':
LEGAL_ALT(0); LEGAL_ALT(0);
for (i = 0; i < 7; i++) { for (i = 0; i < 7; i++) {
/* Full name. */ /* Full name. */
len = strlen(day[i]); len = strlen(day[i]);
if (strncmp(day[i], bp, len) == 0) if (strncmp(day[i], bp, len) == 0) break;
break;
/* Abbreviated name. */
/* Abbreviated name. */ len = strlen(abday[i]);
len = strlen(abday[i]); if (strncmp(abday[i], bp, len) == 0) break;
if (strncmp(abday[i], bp, len) == 0)
break;
} }
/* Nothing matched. */ /* Nothing matched. */
if (i == 7) if (i == 7) return (0);
return (0);
tm->tm_wday = i; tm->tm_wday = i;
bp += len; bp += len;
break; break;
case 'B': /* The month, using the locale's form. */ case 'B': /* The month, using the locale's form. */
case 'b': case 'b':
case 'h': case 'h':
LEGAL_ALT(0); LEGAL_ALT(0);
for (i = 0; i < 12; i++) { for (i = 0; i < 12; i++) {
/* Full name. */ /* Full name. */
len = strlen(mon[i]); len = strlen(mon[i]);
if (strncmp(mon[i], bp, len) == 0) if (strncmp(mon[i], bp, len) == 0) break;
break;
/* Abbreviated name. */
/* Abbreviated name. */ len = strlen(abmon[i]);
len = strlen(abmon[i]); if (strncmp(abmon[i], bp, len) == 0) break;
if (strncmp(abmon[i], bp, len) == 0)
break;
} }
/* Nothing matched. */ /* Nothing matched. */
if (i == 12) if (i == 12) return (0);
return (0);
tm->tm_mon = i; tm->tm_mon = i;
bp += len; bp += len;
break; break;
case 'C': /* The century number. */ case 'C': /* The century number. */
LEGAL_ALT(ALT_E); LEGAL_ALT(ALT_E);
if (!(conv_num(&bp, &i, 0, 99))) if (!(conv_num(&bp, &i, 0, 99))) return (0);
return (0);
if (split_year) { if (split_year) {
tm->tm_year = (tm->tm_year % 100) + (i * 100); tm->tm_year = (tm->tm_year % 100) + (i * 100);
} } else {
else { tm->tm_year = i * 100;
tm->tm_year = i * 100; split_year = 1;
split_year = 1;
} }
break; break;
case 'd': /* The day of month. */ case 'd': /* The day of month. */
case 'e': case 'e':
LEGAL_ALT(ALT_O); LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_mday, 1, 31))) if (!(conv_num(&bp, &tm->tm_mday, 1, 31))) return (0);
return (0);
break; break;
case 'k': /* The hour (24-hour clock representation). */ case 'k': /* The hour (24-hour clock representation). */
LEGAL_ALT(0); LEGAL_ALT(0);
/* FALLTHROUGH */ /* FALLTHROUGH */
case 'H': case 'H':
LEGAL_ALT(ALT_O); LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_hour, 0, 23))) if (!(conv_num(&bp, &tm->tm_hour, 0, 23))) return (0);
return (0);
break; break;
case 'l': /* The hour (12-hour clock representation). */ case 'l': /* The hour (12-hour clock representation). */
LEGAL_ALT(0); LEGAL_ALT(0);
/* FALLTHROUGH */ /* FALLTHROUGH */
case 'I': case 'I':
LEGAL_ALT(ALT_O); LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_hour, 1, 12))) if (!(conv_num(&bp, &tm->tm_hour, 1, 12))) return (0);
return (0); if (tm->tm_hour == 12) tm->tm_hour = 0;
if (tm->tm_hour == 12)
tm->tm_hour = 0;
break; break;
case 'j': /* The day of year. */ case 'j': /* The day of year. */
LEGAL_ALT(0); LEGAL_ALT(0);
if (!(conv_num(&bp, &i, 1, 366))) if (!(conv_num(&bp, &i, 1, 366))) return (0);
return (0);
tm->tm_yday = i - 1; tm->tm_yday = i - 1;
break; break;
case 'M': /* The minute. */ case 'M': /* The minute. */
LEGAL_ALT(ALT_O); LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_min, 0, 59))) if (!(conv_num(&bp, &tm->tm_min, 0, 59))) return (0);
return (0);
break; break;
case 'm': /* The month. */ case 'm': /* The month. */
LEGAL_ALT(ALT_O); LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &i, 1, 12))) if (!(conv_num(&bp, &i, 1, 12))) return (0);
return (0);
tm->tm_mon = i - 1; tm->tm_mon = i - 1;
break; break;
case 'p': /* The locale's equivalent of AM/PM. */ case 'p': /* The locale's equivalent of AM/PM. */
LEGAL_ALT(0); LEGAL_ALT(0);
/* AM? */ /* AM? */
if (strcmp(am_pm[0], bp) == 0) { if (strcmp(am_pm[0], bp) == 0) {
if (tm->tm_hour > 11) if (tm->tm_hour > 11) return (0);
return (0);
bp += strlen(am_pm[0]); bp += strlen(am_pm[0]);
break; break;
} }
/* PM? */ /* PM? */
else if (strcmp(am_pm[1], bp) == 0) { else if (strcmp(am_pm[1], bp) == 0) {
if (tm->tm_hour > 11) if (tm->tm_hour > 11) return (0);
return (0);
tm->tm_hour += 12; tm->tm_hour += 12;
bp += strlen(am_pm[1]); bp += strlen(am_pm[1]);
break; break;
} }
/* Nothing matched. */ /* Nothing matched. */
return (0); return (0);
case 'S': /* The seconds. */ case 'S': /* The seconds. */
LEGAL_ALT(ALT_O); LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_sec, 0, 61))) if (!(conv_num(&bp, &tm->tm_sec, 0, 61))) return (0);
return (0);
break; break;
case 'U': /* The week of year, beginning on sunday. */ case 'U': /* The week of year, beginning on sunday. */
case 'W': /* The week of year, beginning on monday. */ case 'W': /* The week of year, beginning on monday. */
LEGAL_ALT(ALT_O); LEGAL_ALT(ALT_O);
/* /*
* XXX This is bogus, as we can not assume any valid * XXX This is bogus, as we can not assume any valid
* information present in the tm structure at this * information present in the tm structure at this
* point to calculate a real value, so just check the * point to calculate a real value, so just check the
* range for now. * range for now.
*/ */
if (!(conv_num(&bp, &i, 0, 53))) if (!(conv_num(&bp, &i, 0, 53))) return (0);
return (0);
break; break;
case 'w': /* The day of week, beginning on sunday. */ case 'w': /* The day of week, beginning on sunday. */
LEGAL_ALT(ALT_O); LEGAL_ALT(ALT_O);
if (!(conv_num(&bp, &tm->tm_wday, 0, 6))) if (!(conv_num(&bp, &tm->tm_wday, 0, 6))) return (0);
return (0);
break; break;
case 'Y': /* The year. */ case 'Y': /* The year. */
LEGAL_ALT(ALT_E); LEGAL_ALT(ALT_E);
if (!(conv_num(&bp, &i, 0, 9999))) if (!(conv_num(&bp, &i, 0, 9999))) return (0);
return (0);
tm->tm_year = i - TM_YEAR_BASE; tm->tm_year = i - TM_YEAR_BASE;
break; break;
case 'y': /* The year within 100 years of the epoch. */ case 'y': /* The year within 100 years of the epoch. */
LEGAL_ALT(ALT_E | ALT_O); LEGAL_ALT(ALT_E | ALT_O);
if (!(conv_num(&bp, &i, 0, 99))) if (!(conv_num(&bp, &i, 0, 99))) return (0);
return (0);
if (split_year) { if (split_year) {
tm->tm_year = ((tm->tm_year / 100) * 100) + i; tm->tm_year = ((tm->tm_year / 100) * 100) + i;
break; break;
} }
split_year = 1; split_year = 1;
if (i <= 68) if (i <= 68)
tm->tm_year = i + 2000 - TM_YEAR_BASE; tm->tm_year = i + 2000 - TM_YEAR_BASE;
else else
tm->tm_year = i + 1900 - TM_YEAR_BASE; tm->tm_year = i + 1900 - TM_YEAR_BASE;
break; break;
/* /*
* Miscellaneous conversions. * Miscellaneous conversions.
*/ */
case 'n': /* Any kind of white-space. */ case 'n': /* Any kind of white-space. */
case 't': case 't':
LEGAL_ALT(0); LEGAL_ALT(0);
while (isspace(*bp)) while (isspace(*bp)) bp++;
bp++;
break; break;
default: /* Unknown/unsupported conversion. */
default: /* Unknown/unsupported conversion. */
return (0); return (0);
} }
}
/* LINTED functional specification */
} return ((char *)bp);
/* LINTED functional specification */
return ((char *)bp);
#else #else
return strptime(buf, fmt, tm); return strptime(buf, fmt, tm);
#endif #endif
} }
...@@ -435,13 +355,9 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) { ...@@ -435,13 +355,9 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
#endif #endif
} }
time_t taosTime(time_t *t) { time_t taosTime(time_t *t) { return time(t); }
return time(t);
}
time_t taosMktime(struct tm *timep) { time_t taosMktime(struct tm *timep) { return mktime(timep); }
return mktime(timep);
}
struct tm *taosLocalTime(const time_t *timep, struct tm *result) { struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
if (result == NULL) { if (result == NULL) {
...@@ -456,5 +372,36 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result) { ...@@ -456,5 +372,36 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
} }
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); } int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
int32_t taosClockGetTime(int clock_id, struct timespec *pTS) {
int32_t taosClockGetTime(int clock_id, struct timespec *pTS) { return clock_gettime(clock_id, pTS); } #ifdef WINDOWS
\ No newline at end of file LARGE_INTEGER t;
FILETIME f;
static FILETIME ff;
static SYSTEMTIME ss;
static LARGE_INTEGER offset;
ss.wYear = 1970;
ss.wMonth = 1;
ss.wDay = 1;
ss.wHour = 0;
ss.wMinute = 0;
ss.wSecond = 0;
ss.wMilliseconds = 0;
SystemTimeToFileTime(&ss, &ff);
offset.QuadPart = ff.dwHighDateTime;
offset.QuadPart <<= 32;
offset.QuadPart |= ff.dwLowDateTime;
GetSystemTimeAsFileTime(&f);
t.QuadPart = f.dwHighDateTime;
t.QuadPart <<= 32;
t.QuadPart |= f.dwLowDateTime;
t.QuadPart -= offset.QuadPart;
pTS->tv_sec = t.QuadPart / 10000000;
pTS->tv_nsec = (t.QuadPart % 10000000)*100;
return (0);
#else
return clock_gettime(clock_id, pTS);
#endif
}
\ No newline at end of file
...@@ -321,6 +321,7 @@ class TDDnode: ...@@ -321,6 +321,7 @@ class TDDnode:
self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.start(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index)) self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.start(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
self.running = 1 self.running = 1
else: else:
os.system("rm -rf %s/taosdlog.0"%self.logDir)
if os.system(cmd) != 0: if os.system(cmd) != 0:
tdLog.exit(cmd) tdLog.exit(cmd)
self.running = 1 self.running = 1
...@@ -338,8 +339,6 @@ class TDDnode: ...@@ -338,8 +339,6 @@ class TDDnode:
if i > 50: if i > 50:
break break
tailCmdStr = 'tail -f ' tailCmdStr = 'tail -f '
if platform.system().lower() == 'windows':
tailCmdStr = 'tail -n +0 -f '
popen = subprocess.Popen( popen = subprocess.Popen(
tailCmdStr + logFile, tailCmdStr + logFile,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
...@@ -574,6 +573,9 @@ class TDDnodes: ...@@ -574,6 +573,9 @@ class TDDnodes:
def stopAll(self): def stopAll(self):
tdLog.info("stop all dnodes") tdLog.info("stop all dnodes")
if (not self.dnodes[0].remoteIP == ""):
self.dnodes[0].remoteExec(self.dnodes[0].cfgDict, "for i in range(len(tdDnodes.dnodes)):\n tdDnodes.dnodes[i].running=1\ntdDnodes.stopAll()")
return
for i in range(len(self.dnodes)): for i in range(len(self.dnodes)):
self.dnodes[i].stop() self.dnodes[i].stop()
......
...@@ -17,7 +17,7 @@ sql use test ...@@ -17,7 +17,7 @@ sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double,id int); sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
sql create stream streams2 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s); sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL,1); sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL,1);
sql insert into t1 values(1648791223001,10,2,3,1.1,2); sql insert into t1 values(1648791223001,10,2,3,1.1,2);
sql insert into t1 values(1648791233002,3,2,3,2.1,3); sql insert into t1 values(1648791233002,3,2,3,2.1,3);
...@@ -176,4 +176,107 @@ if $data08 != 13 then ...@@ -176,4 +176,107 @@ if $data08 != 13 then
return -1 return -1
endi endi
sql create database test2 vgroups 1;
sql use test2;
sql create table t2(ts timestamp, a int, b int , c int, d double, id int);
sql create stream streams2 trigger at_once watermark 1d into streamt2 as select _wstartts,apercentile(a,30) c1, apercentile(a,70), apercentile(a,20,"t-digest") c2, apercentile(a,60,"t-digest") c3, max(id) c4 from t2 session(ts,10s);
sql insert into t2 values(1648791213001,1,1,3,1.0,1);
sql insert into t2 values(1648791213002,2,2,6,3.4,2);
sql insert into t2 values(1648791213003,4,9,3,4.8,3);
sql insert into t2 values(1648791233003,3,4,3,2.1,4);
sql insert into t2 values(1648791233004,3,5,3,3.4,5);
sql insert into t2 values(1648791233005,3,6,3,7.6,6);
#
sql insert into t2 values(1648791223003,20,7,3,10.1,7);
$loop_count = 0
loop2:
sleep 300
sql select * from streamt2 where c4=7;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print ======$rows
goto loop2
endi
# row 0
if $data01 != 2.091607978 then
print =====data01=$data01
goto loop2
endi
if $data02 != 3.274823935 then
print =====data02=$data02
goto loop2
endi
if $data03 != 1.800000000 then
print ======$data03
return -1
endi
if $data04 != 3.350000000 then
print ======$data04
return -1
endi
sql create database test3 vgroups 1;
sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once watermark 1d into streamt3 as select _wstartts, min(b), a,c from t1 session(ts,10s);
sql create stream streams4 trigger at_once watermark 1d into streamt4 as select _wstartts, max(b), a,c from t1 session(ts,10s);
sql create stream streams5 trigger at_once watermark 1d into streamt5 as select _wstartts, max(b), a,c from t1 session(ts,10s);
sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, max(b), a,c from t1 session(ts,10s);
sql insert into t1 values(1648791213001,1,1,1,1.0);
sql insert into t1 values(1648791213002,2,3,2,3.4);
sql insert into t1 values(1648791213003,4,9,3,4.8);
sql insert into t1 values(1648791213004,4,5,4,4.8);
sql insert into t1 values(1648791233004,3,4,0,2.1);
sql insert into t1 values(1648791233005,3,0,6,3.4);
sql insert into t1 values(1648791233006,3,6,7,7.6);
sql insert into t1 values(1648791233007,3,13,8,7.6);
sql insert into t1 values(1648791223004,20,7,9,10.1);
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt3;
if $rows == 0 then
print ======$rows
goto loop3
endi
sql select * from streamt4;
if $rows == 0 then
print ======$rows
goto loop3
endi
sql select * from streamt5;
if $rows == 0 then
print ======$rows
goto loop3
endi
sql select * from streamt6;
if $rows == 0 then
print ======$rows
goto loop3
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -23,7 +23,7 @@ class TDTestCase: ...@@ -23,7 +23,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -37,7 +37,7 @@ class TDTestCase: ...@@ -37,7 +37,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -9,8 +9,9 @@ from util.sql import * ...@@ -9,8 +9,9 @@ from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
import subprocess import subprocess
# import win32gui if (platform.system().lower() == 'windows'):
# import threading import win32gui
import threading
class TDTestCase: class TDTestCase:
...@@ -535,17 +536,18 @@ class TDTestCase: ...@@ -535,17 +536,18 @@ class TDTestCase:
return udf1_sqls ,udf2_sqls return udf1_sqls ,udf2_sqls
# def checkRunTimeError(self): def checkRunTimeError(self):
# while 1: if (platform.system().lower() == 'windows' and tdDnodes.dnodes[0].remoteIP == ""):
# time.sleep(1) while 1:
# hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library") time.sleep(1)
# if hwnd: hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library")
# os.system("TASKKILL /F /IM udfd.exe") if hwnd:
os.system("TASKKILL /F /IM udfd.exe")
def unexpected_create(self): def unexpected_create(self):
# if (platform.system().lower() == 'windows' and tdDnodes.dnodes[0].remoteIP == ""): if (platform.system().lower() == 'windows' and tdDnodes.dnodes[0].remoteIP == ""):
# checkErrorThread = threading.Thread(target=self.checkRunTimeError,daemon=True) checkErrorThread = threading.Thread(target=self.checkRunTimeError,daemon=True)
# checkErrorThread.start() checkErrorThread.start()
tdLog.info(" create function with out bufsize ") tdLog.info(" create function with out bufsize ")
tdSql.query("drop function udf1 ") tdSql.query("drop function udf1 ")
......
...@@ -3,6 +3,7 @@ import taos ...@@ -3,6 +3,7 @@ import taos
import time import time
import inspect import inspect
import traceback import traceback
import socket
from dataclasses import dataclass from dataclasses import dataclass
from util.log import * from util.log import *
...@@ -102,7 +103,7 @@ class TDconnect: ...@@ -102,7 +103,7 @@ class TDconnect:
def taos_connect( def taos_connect(
host = "127.0.0.1", host = socket.gethostname(),
port = 6030, port = 6030,
user = "root", user = "root",
passwd = "taosdata", passwd = "taosdata",
......
...@@ -54,7 +54,7 @@ class TDTestCase: ...@@ -54,7 +54,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")] buildPath = root[:len(root)-len("/build/bin")]
......
...@@ -52,7 +52,7 @@ class TDTestCase: ...@@ -52,7 +52,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")] buildPath = root[:len(root)-len("/build/bin")]
......
...@@ -49,7 +49,7 @@ class TDTestCase: ...@@ -49,7 +49,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")] buildPath = root[:len(root)-len("/build/bin")]
......
...@@ -49,7 +49,7 @@ class TDTestCase: ...@@ -49,7 +49,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")] buildPath = root[:len(root)-len("/build/bin")]
......
...@@ -49,7 +49,7 @@ class TDTestCase: ...@@ -49,7 +49,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")] buildPath = root[:len(root)-len("/build/bin")]
......
...@@ -400,7 +400,7 @@ class TDTestCase: ...@@ -400,7 +400,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -3,7 +3,10 @@ from util.log import * ...@@ -3,7 +3,10 @@ from util.log import *
from util.sql import * from util.sql import *
from util.cases import * from util.cases import *
import platform
import os import os
if platform.system().lower() == 'windows':
import tzlocal
class TDTestCase: class TDTestCase:
...@@ -15,16 +18,20 @@ class TDTestCase: ...@@ -15,16 +18,20 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method def run(self): # sourcery skip: extract-duplicate-method
tdSql.prepare() tdSql.prepare()
# get system timezone # get system timezone
time_zone_arr = os.popen('timedatectl | grep zone').read( if platform.system().lower() == 'windows':
).strip().split(':') time_zone_1 = tzlocal.get_localzone_name()
if len(time_zone_arr) > 1: time_zone_2 = time.strftime('(UTC, %z)')
time_zone = time_zone_arr[1].lstrip()
else:
# possibly in a docker container
time_zone_1 = os.popen('ls -l /etc/localtime|awk -F/ \'{print $(NF-1) "/" $NF}\'').read().strip()
time_zone_2 = os.popen('date "+(%Z, %z)"').read().strip()
time_zone = time_zone_1 + " " + time_zone_2 time_zone = time_zone_1 + " " + time_zone_2
print("expected time zone: " + time_zone) else:
time_zone_arr = os.popen('timedatectl | grep zone').read().strip().split(':')
if len(time_zone_arr) > 1:
time_zone = time_zone_arr[1].lstrip()
else:
# possibly in a docker container
time_zone_1 = os.popen('ls -l /etc/localtime|awk -F/ \'{print $(NF-1) "/" $NF}\'').read().strip()
time_zone_2 = os.popen('date "+(%Z, %z)"').read().strip()
time_zone = time_zone_1 + " " + time_zone_2
print("expected time zone: " + time_zone)
tdLog.printNoPrefix("==========step1:create tables==========") tdLog.printNoPrefix("==========step1:create tables==========")
tdSql.execute( tdSql.execute(
......
...@@ -13,6 +13,12 @@ from util.dnodes import * ...@@ -13,6 +13,12 @@ from util.dnodes import *
class TDTestCase: class TDTestCase:
hostname = socket.gethostname() hostname = socket.gethostname()
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
try:
config = eval(tdDnodes.dnodes[0].remoteIP)
hostname = config["host"]
except Exception:
hostname = tdDnodes.dnodes[0].remoteIP
#rpcDebugFlagVal = '143' #rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
...@@ -34,7 +40,7 @@ class TDTestCase: ...@@ -34,7 +40,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
...@@ -192,7 +198,10 @@ class TDTestCase: ...@@ -192,7 +198,10 @@ class TDTestCase:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" if (platform.system().lower() == 'windows'):
shellCmd += "> nul 2>&1 &"
else:
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
...@@ -306,7 +315,10 @@ class TDTestCase: ...@@ -306,7 +315,10 @@ class TDTestCase:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" if (platform.system().lower() == 'windows'):
shellCmd += "> nul 2>&1 &"
else:
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
...@@ -438,7 +450,10 @@ class TDTestCase: ...@@ -438,7 +450,10 @@ class TDTestCase:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &" if (platform.system().lower() == 'windows'):
shellCmd += "> nul 2>&1 &"
else:
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -34,7 +34,7 @@ class TDTestCase: ...@@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -34,7 +34,7 @@ class TDTestCase: ...@@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -34,7 +34,7 @@ class TDTestCase: ...@@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -34,7 +34,7 @@ class TDTestCase: ...@@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -33,7 +33,7 @@ class TDTestCase: ...@@ -33,7 +33,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -34,7 +34,7 @@ class TDTestCase: ...@@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -34,7 +34,7 @@ class TDTestCase: ...@@ -34,7 +34,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
...@@ -41,7 +41,7 @@ class TDTestCase: ...@@ -41,7 +41,7 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
......
python3 .\test.py -f 0-others\taosShell.py @REM python3 .\test.py -f 0-others\taosShell.py
python3 .\test.py -f 0-others\taosShellError.py @REM python3 .\test.py -f 0-others\taosShellError.py
python3 .\test.py -f 0-others\taosShellNetChk.py python3 .\test.py -f 0-others\taosShellNetChk.py
python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\telemetry.py
@REM python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\taosdMonitor.py
python3 .\test.py -f 0-others\udfTest.py python3 .\test.py -f 0-others\udfTest.py
python3 .\test.py -f 0-others\udf_create.py python3 .\test.py -f 0-others\udf_create.py
python3 .\test.py -f 0-others\udf_restart_taosd.py @REM python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\cachelast.py @REM python3 .\test.py -f 0-others\cachelast.py
@REM python3 .\test.py -f 0-others\user_control.py @REM python3 .\test.py -f 0-others\user_control.py
......
...@@ -2,14 +2,7 @@ ...@@ -2,14 +2,7 @@
SETLOCAL EnableDelayedExpansion SETLOCAL EnableDelayedExpansion
for /F "tokens=1,2 delims=#" %%a in ('"prompt #$H#$E# & echo on & for %%b in (1) do rem"') do ( set "DEL=%%a") for /F "tokens=1,2 delims=#" %%a in ('"prompt #$H#$E# & echo on & for %%b in (1) do rem"') do ( set "DEL=%%a")
set /a a=0 set /a a=0
@REM echo Windows Taosd Test echo Windows Taosd Test
@REM for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
@REM echo Processing %%i
@REM set /a a+=1
@REM call %%i ARG1 > result_!a!.txt 2>error_!a!.txt
@REM if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && exit 8 ) else ( call :colorEcho 0a "Success" &echo. )
@REM )
echo Linux Taosd Test
for /F "usebackq tokens=*" %%i in (fulltest.bat) do ( for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" ( for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
echo Processing %%i echo Processing %%i
...@@ -17,10 +10,22 @@ for /F "usebackq tokens=*" %%i in (fulltest.bat) do ( ...@@ -17,10 +10,22 @@ for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
set time1=!_timeTemp! set time1=!_timeTemp!
echo Start at %time% echo Start at %time%
set /a a+=1 set /a a+=1
call %%i ARG1 -m %1 > result_!a!.txt 2>error_!a!.txt call %%i ARG1 > result_!a!.txt 2>error_!a!.txt
if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. ) if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. )
) )
) )
@REM echo Linux Taosd Test
@REM for /F "usebackq tokens=*" %%i in (fulltest.bat) do (
@REM for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" (
@REM echo Processing %%i
@REM call :GetTimeSeconds %time%
@REM set time1=!_timeTemp!
@REM echo Start at %time%
@REM set /a a+=1
@REM call %%i ARG1 -m %1 > result_!a!.txt 2>error_!a!.txt
@REM if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && echo result: && cat result_!a!.txt && echo error: && cat error_!a!.txt && exit 8 ) else ( call :colorEcho 0a "Success" &echo. )
@REM )
@REM )
exit exit
:colorEcho :colorEcho
......
...@@ -20,6 +20,7 @@ import time ...@@ -20,6 +20,7 @@ import time
import base64 import base64
import json import json
import platform import platform
import socket
from distutils.log import warn as printf from distutils.log import warn as printf
from fabric2 import Connection from fabric2 import Connection
sys.path.append("../pytest") sys.path.append("../pytest")
...@@ -149,7 +150,7 @@ if __name__ == "__main__": ...@@ -149,7 +150,7 @@ if __name__ == "__main__":
tdLog.info('stop All dnodes') tdLog.info('stop All dnodes')
if masterIp == "": if masterIp == "":
host = '127.0.0.1' host = socket.gethostname()
else: else:
try: try:
config = eval(masterIp) config = eval(masterIp)
...@@ -170,8 +171,8 @@ if __name__ == "__main__": ...@@ -170,8 +171,8 @@ if __name__ == "__main__":
try: try:
if key_word in open(fileName, encoding='UTF-8').read(): if key_word in open(fileName, encoding='UTF-8').read():
is_test_framework = 1 is_test_framework = 1
except: except Exception as r:
pass print(r)
updateCfgDictStr = '' updateCfgDictStr = ''
if is_test_framework: if is_test_framework:
moduleName = fileName.replace(".py", "").replace(os.sep, ".") moduleName = fileName.replace(".py", "").replace(os.sep, ".")
...@@ -181,8 +182,8 @@ if __name__ == "__main__": ...@@ -181,8 +182,8 @@ if __name__ == "__main__":
if ((json.dumps(updateCfgDict) == '{}') and (ucase.updatecfgDict is not None)): if ((json.dumps(updateCfgDict) == '{}') and (ucase.updatecfgDict is not None)):
updateCfgDict = ucase.updatecfgDict updateCfgDict = ucase.updatecfgDict
updateCfgDictStr = "-d %s"%base64.b64encode(json.dumps(updateCfgDict).encode()).decode() updateCfgDictStr = "-d %s"%base64.b64encode(json.dumps(updateCfgDict).encode()).decode()
except : except Exception as r:
pass print(r)
else: else:
pass pass
tdDnodes.deploy(1,updateCfgDict) tdDnodes.deploy(1,updateCfgDict)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册