提交 dc02d8d9 编写于 作者: M Minghao Li

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

......@@ -45,15 +45,15 @@ Add the [libtaos][libtaos] dependency to the [Rust](https://rust-lang.org) proje
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
Add [libtaos][libtaos] to the ``Cargo.toml`'' file.
Add [libtaos][libtaos] to the `Cargo.toml` file.
``toml
```toml
[dependencies]
# use default feature
libtaos = "*"
```
</TabItem
</TabItem>
<TabItem value="rest" label="REST connection">
Add [libtaos][libtaos] to the `Cargo.toml` file and enable the `rest` feature.
......
......@@ -50,6 +50,7 @@ typedef enum EStreamType {
STREAM_INVERT,
STREAM_REPROCESS,
STREAM_INVALID,
STREAM_GET_ALL,
} EStreamType;
typedef struct {
......
......@@ -1496,6 +1496,7 @@ typedef struct {
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
......
......@@ -689,16 +689,15 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
}
if (TSDB_CODE_SUCCESS == code) {
schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob,
pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest);
} else {
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
......
......@@ -778,9 +778,9 @@ TEST(testCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
taos_query(pConn, "use test");
taos_query(pConn, "use nest");
TAOS_RES* pRes = taos_query(pConn, "desc abc1.tu");
TAOS_RES* pRes = taos_query(pConn, "select NOW() from (select * from regular_table_2 where tbname in ('regular_table_2_1') and q_bigint <= 9223372036854775807 and q_tinyint <= 127 and q_bool in ( true , false) ) order by ts;");
if (taos_errno(pRes) != 0) {
printf("failed, reason:%s\n", taos_errstr(pRes));
}
......
......@@ -294,7 +294,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, in
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows) {
ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
if (numOfRows == 0) {
if (numOfRows <= 0) {
return numOfRows;
}
......@@ -1239,6 +1239,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
return NULL;
}
if (pSrc->pData == NULL) {
continue;
}
colDataAssign(pDst, pSrc, pDataBlock->info.rows);
}
......
......@@ -38,9 +38,13 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
dTrace("status msg received from mnode, dnodeVer:%" PRId64 " saved:%" PRId64, statusRsp.dnodeVer,
pMgmt->pData->dnodeVer);
if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
}
}
rpcFreeCont(pRsp->pCont);
tFreeSStatusRsp(&statusRsp);
......@@ -89,7 +93,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527};
SRpcMsg rpcRsp = {0};
dTrace("send status msg to mnode");
dTrace("send status msg to mnode, dnodeVer:%" PRId64, req.dnodeVer);
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet);
......
......@@ -81,6 +81,13 @@ int32_t dmReadEps(SDnodeData *pData) {
}
pData->dnodeId = dnodeId->valueint;
cJSON *dnodeVer = cJSON_GetObjectItem(root, "dnodeVer");
if (!dnodeVer || dnodeVer->type != cJSON_String) {
dError("failed to read %s since dnodeVer not found", file);
goto _OVER;
}
pData->dnodeVer = atoll(dnodeVer->valuestring);
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", file);
......@@ -193,6 +200,7 @@ int32_t dmWriteEps(SDnodeData *pData) {
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId);
len += snprintf(content + len, maxLen - len, " \"dnodeVer\": \"%" PRId64 "\",\n", pData->dnodeVer);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pData->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
......@@ -224,30 +232,15 @@ int32_t dmWriteEps(SDnodeData *pData) {
}
pData->updateTime = taosGetTimestampMs();
dDebug("successed to write %s", realfile);
dDebug("successed to write %s, dnodeVer:%" PRId64, realfile, pData->dnodeVer);
return 0;
}
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
int32_t numOfEps = taosArrayGetSize(eps);
if (numOfEps <= 0) return;
taosThreadRwlockWrlock(&pData->lock);
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps);
if (numOfEps != numOfEpsOld) {
dDebug("new dnode list get from mnode");
dmResetEps(pData, eps);
dmWriteEps(pData);
} else {
int32_t size = numOfEps * sizeof(SDnodeEp);
if (memcmp(pData->dnodeEps->pData, eps->pData, size) != 0) {
dDebug("new dnode list get from mnode");
dmResetEps(pData, eps);
dmWriteEps(pData);
}
}
dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
dmResetEps(pData, eps);
dmWriteEps(pData);
taosThreadRwlockUnlock(&pData->lock);
}
......
......@@ -447,6 +447,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
if (pConsumerOld == NULL) {
mInfo("receive subscribe request from new consumer: %ld", consumerId);
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
......@@ -463,7 +465,12 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} else {
/*taosRLockLatch(&pConsumerOld->lock);*/
int32_t status = atomic_load_32(&pConsumerOld->status);
mInfo("receive subscribe request from old consumer: %ld, current status: %s", consumerId,
mndConsumerStatusName(status));
if (status != MQ_CONSUMER_STATUS__READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto SUBSCRIBE_OVER;
......
......@@ -385,7 +385,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer != dnodeVer);
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool needCheck = !online || dnodeChanged || reboot;
......@@ -427,7 +427,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
if (!online) {
mInfo("dnode:%d, from offline to online", pDnode->id);
} else {
mDebug("dnode:%d, send dnode epset, online:%d dnode_ver:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
mDebug("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
statusReq.dnodeVer, dnodeVer, reboot);
}
......
......@@ -183,7 +183,10 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
mInfo("commit offset %ld to vg %d of consumer group %s on topic %s", pOffset->offset, pOffset->vgId,
pOffset->cgroup, pOffset->topicName);
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
mError("submit offset to topic %s failed", pOffset->topicName);
return -1;
}
bool create = false;
......@@ -192,7 +195,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOffset->topicName);
if (pTopic == NULL) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr());
mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr());
continue;
}
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
......
......@@ -151,33 +151,36 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pMnode->pSdb, pDb);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
int32_t sinkLvSize = taosArrayGetSize(sinkLv);
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
break;
}
ASSERT(0);
return -1;
}
sdbRelease(pMnode->pSdb, pDb);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
int32_t sinkLvSize = taosArrayGetSize(sinkLv);
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
ASSERT(pVgInfo->taskId != 0);
break;
}
}
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
return 0;
}
......@@ -379,7 +382,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
// dispatch
mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask);
if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) {
qDestroyQueryPlan(pPlan);
return -1;
}
// exec
pFinalTask->execType = TASK_EXEC__PIPE;
......
......@@ -200,14 +200,14 @@ int32_t mndInitSync(SMnode *pMnode) {
return -1;
}
mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync);
mDebug("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
return 0;
}
void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncStop(pMgmt->sync);
mDebug("mnode sync is stopped, id:%" PRId64, pMgmt->sync);
mDebug("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
tsem_destroy(&pMgmt->syncSem);
memset(pMgmt, 0, sizeof(SSyncMgmt));
......
......@@ -297,7 +297,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
// mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
......@@ -330,7 +330,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
// mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
......@@ -363,7 +363,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", action.pRaw);
// mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
......
......@@ -163,7 +163,12 @@ void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; }
void sdbSetCurConfig(SSdb *pSdb, int64_t config) { pSdb->curConfig = config; }
void sdbSetCurConfig(SSdb *pSdb, int64_t config) {
if (pSdb->curConfig != config) {
mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->curConfig, config);
pSdb->curConfig = config;
}
}
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
......
......@@ -432,8 +432,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
} else {
pSdb->lastCommitVer = pSdb->curVer;
pSdb->lastCommitTerm = pSdb->curTerm;
mDebug("write sdb file successfully, ver:%" PRId64 " term:%" PRId64 " file:%s", pSdb->lastCommitVer,
pSdb->lastCommitTerm, curfile);
mDebug("write sdb file successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
pSdb->lastCommitVer, pSdb->lastCommitTerm, pSdb->curConfig, curfile);
}
terrno = code;
......
......@@ -130,7 +130,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
ASSERT(pHandle);
/*ASSERT(pHandle);*/
if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, pTq->pVnode->config.vgId,
pReq->subKey);
return -1;
}
if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId);
return -1;
}
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
while (consumerEpoch < reqEpoch) {
......
......@@ -759,7 +759,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
......
......@@ -1819,9 +1819,9 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
}
}
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, bool needFree);
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) {
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
if (pFilterNode == NULL) {
return;
}
......@@ -1840,30 +1840,29 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) {
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter);
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep, needFree);
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
blockDataUpdateTsWindow(pBlock, 0);
}
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, bool needFree) {
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
if (keep) {
return;
}
if (rowRes != NULL) {
SSDataBlock* px = createOneDataBlock(pBlock, false);
blockDataEnsureCapacity(px, pBlock->info.rows);
SSDataBlock* px = createOneDataBlock(pBlock, true);
int32_t totalRows = pBlock->info.rows;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
// it is a reserved column for scalar function, and no data in this column yet.
if (pSrc->pData == NULL) {
if (pDst->pData == NULL) {
continue;
}
colInfoDataCleanup(pDst, pBlock->info.rows);
int32_t numOfRows = 0;
for (int32_t j = 0; j < totalRows; ++j) {
if (rowRes[j] == 0) {
......@@ -1883,20 +1882,8 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
} else {
ASSERT(pBlock->info.rows == numOfRows);
}
SColumnInfoData tmp = *pSrc;
*pSrc = *pDst;
*pDst = tmp;
if (!needFree) {
if (IS_VAR_DATA_TYPE(pDst->info.type)) { // this elements do not need free
pDst->varmeta.offset = NULL;
} else {
pDst->nullbitmap = NULL;
}
pDst->pData = NULL;
}
}
blockDataDestroy(px); // fix memory leak
} else {
// do nothing
......@@ -2137,11 +2124,6 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows,
}
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
// for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
// SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
// p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows);
// }
int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
pBlock->info.rows += numOfRows;
......@@ -2896,7 +2878,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
return seqLoadRemoteData(pOperator);
} else {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
// return concurrentlyLoadRemoteData(pOperator);
}
}
......@@ -2922,9 +2903,14 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo) {
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
if (numOfSources == 0) {
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t) numOfSources);
return TSDB_CODE_INVALID_PARA;
}
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
......@@ -2946,7 +2932,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
goto _error;
}
int32_t code = initExchangeOperator(pExNode, pInfo);
int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -2975,7 +2961,7 @@ _error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
pTaskInfo->code = code;
return NULL;
}
......@@ -3744,7 +3730,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code);
}
doFilter(pProjectInfo->pFilterNode, pBlock, true);
doFilter(pProjectInfo->pFilterNode, pBlock);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
......@@ -5263,7 +5249,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
&(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
if (NULL == (*pTaskInfo)->pRoot) {
code = terrno;
code = (*pTaskInfo)->code;
goto _complete;
}
......@@ -5276,7 +5262,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
_complete:
taosMemoryFreeClear(*pTaskInfo);
terrno = code;
return code;
}
......
......@@ -359,7 +359,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, true);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
......
......@@ -267,7 +267,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, false);
doFilter(pTableScanInfo->pFilterNode, pBlock);
int64_t et = taosGetTimestampMs();
pTableScanInfo->readRecorder.filterTime += (et - st);
......@@ -948,7 +948,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
}
doFilter(pInfo->pCondition, pInfo->pRes, false);
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
break;
}
......@@ -1716,7 +1716,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
}
pRes->info.rows = count;
doFilter(pInfo->pFilterNode, pRes, true);
doFilter(pInfo->pFilterNode, pRes);
pOperator->resultInfo.totalRows += pRes->info.rows;
......
......@@ -296,7 +296,10 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
}
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) {
SArray* pColMatchInfo, SOperatorInfo* pOperator) {
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pDataBlock);
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
......@@ -354,6 +357,8 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
}
blockDataDestroy(p);
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.rows);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
......@@ -371,7 +376,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
}
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes,
pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pInfo);
pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
......
......@@ -750,14 +750,15 @@ int64_t getReskey(void* data, int32_t index) {
return *(int64_t*)pos->key;
}
static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId,
SArray* pUpdated) {
int32_t size = taosArrayGetSize(pUpdated);
int32_t index = binarySearch(pUpdated, size, result->win.skey, TSDB_ORDER_DESC, getReskey);
int32_t index = binarySearch(pUpdated, size, ts, TSDB_ORDER_DESC, getReskey);
if (index == -1) {
index = 0;
} else {
TSKEY resTs = getReskey(pUpdated, index);
if (resTs < result->win.skey) {
if (resTs < ts) {
index++;
} else {
return TSDB_CODE_SUCCESS;
......@@ -769,14 +770,18 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated
return TSDB_CODE_OUT_OF_MEMORY;
}
newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = result->pageId, .offset = result->offset};
*(int64_t*)newPos->key = result->win.skey;
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
*(int64_t*)newPos->key = ts;
if (taosArrayInsert(pUpdated, index, &newPos) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
}
static void removeResult(SArray* pUpdated, TSKEY key) {
int32_t size = taosArrayGetSize(pUpdated);
int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey);
......@@ -819,7 +824,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(pResult, tableGroupId, pUpdated);
saveResultRow(pResult, tableGroupId, pUpdated);
}
}
......@@ -869,7 +874,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(pResult, tableGroupId, pUpdated);
saveResultRow(pResult, tableGroupId, pUpdated);
}
}
......@@ -1243,6 +1248,23 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, SInterva
}
}
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*)key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SArray* closeWins) {
void* pIte = NULL;
......@@ -1259,16 +1281,12 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen);
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (pos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pos->groupId = groupId;
pos->pos = *(SResultRowPosition*)pIte;
*(int64_t*)pos->key = ts;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && !taosArrayPush(closeWins, &pos)) {
taosMemoryFree(pos);
return TSDB_CODE_OUT_OF_MEMORY;
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
......@@ -1296,8 +1314,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
SArray* pClosed = taosArrayInit(4, POINTER_BYTES);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
......@@ -1316,19 +1332,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue;
} else if (pBlock->info.type == STREAM_GET_ALL &&
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue;
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
}
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
taosArrayAddAll(pUpdated, pClosed);
taosArrayDestroy(pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
......@@ -1898,7 +1913,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResult(pResult, tableGroupId, pUpdated);
saveResultRow(pResult, tableGroupId, pUpdated);
}
// window start(end) key interpolation
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
......@@ -1993,7 +2008,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
SArray* pClosed = taosArrayInit(4, POINTER_BYTES);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -2042,7 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
taosArrayDestroy(pUpWins);
break;
} else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo) &&
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
continue;
}
if (isFinalInterval(pInfo)) {
int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren);
......@@ -2064,13 +2083,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
if (isFinalInterval(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
}
taosArrayDestroy(pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
......
......@@ -227,6 +227,8 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
// set current source id done
if (pSource->src.pBlock == NULL) {
pSource->src.rowIndex = -1;
++pHandle->numOfCompletedSources;
......@@ -426,8 +428,16 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64,
pHandle->idStr, (int32_t) (sortPass + 1), pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0, pHandle->sortElapsed, pHandle->totalElapsed);
if (sortPass > 0) {
size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
} else {
qDebug("%s ordered source:%"PRIzu", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
pHandle->numOfPages);
}
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
......
......@@ -100,7 +100,6 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum,
};
qInfo("dispatch from task %d (child id %d)", pTask->taskId, pTask->childId);
req.data = taosArrayInit(blockNum, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
......@@ -142,11 +141,14 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
break;
}
}
ASSERT(vgId != 0);
}
ASSERT(vgId != 0);
req.taskId = downstreamTaskId;
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId,
downstreamTaskId, vgId);
// serialize
int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
......
......@@ -122,10 +122,10 @@ class TDTestCase:
for i in ['bigint','unsigned int','float','double','binary(10)','nchar(10)']:
for j in [1,2,3]:
tdSql.error(f'alter table {stbname} modify tag t{j} {i}')
tdSql.error(f'alter stable {stbname} modify tag t{j} {i}')
for i in ['int','unsigned int','float','binary(10)','nchar(10)']:
tdSql.error(f'alter table {stbname} modify tag t8 {i}')
tdSql.error(f'alter table {stbname} modify tag t4 int')
tdSql.error(f'alter stable {stbname} modify tag t8 {i}')
tdSql.error(f'alter stable {stbname} modify tag t4 int')
tdSql.execute(f'drop database {dbname}')
def run(self):
......
......@@ -145,10 +145,17 @@ class TDTestCase:
tdSql.execute(f'alter table {dbname}.{tbname} rename column c1 c21')
tdSql.query(f'describe {dbname}.{tbname}')
tdSql.checkData(1,0,'c21')
# !bug TD-16423
# tdSql.error(f'select c1 from {dbname}.{tbname}')
# tdSql.query(f'select c21 from {dbname}.{tbname}')
# tdSql.checkData(0,1,1)
tdSql.execute(f'alter table {dbname}.{tbname} rename column `c21` c1')
tdSql.query(f'describe {dbname}.{tbname}')
tdSql.checkData(1,0,'c1')
# !bug TD-16423
# tdSql.error(f'select c1 from {dbname}.{tbname}')
# tdSql.query(f'select c1 from {dbname}.{tbname}')
# tdSql.checkData(0,1,1)
tdSql.error(f'alter table {dbname}.{tbname} modify column c1 bigint')
tdSql.error(f'alter table {dbname}.{tbname} modify column c1 double')
tdSql.error(f'alter table {dbname}.{tbname} modify column c4 int')
......@@ -158,9 +165,109 @@ class TDTestCase:
tdSql.error(f'alter table {dbname}.{tbname} modify column c1 bool')
tdSql.error(f'alter table {dbname}.{tbname} modify column c1 binary(10)')
tdSql.execute(f'drop database {dbname}')
def alter_stb_column_check(self):
dbname = self.get_long_name(length=10, mode="letters")
tdSql.execute(f'create database if not exists {dbname}')
stbname = self.get_long_name(length=3, mode="letters")
tbname = self.get_long_name(length=3, mode="letters")
tdSql.execute(f'create database if not exists {dbname}')
tdSql.execute(f'use {dbname}')
tdSql.execute(
f'create table {stbname} (ts timestamp, c1 tinyint, c2 smallint, c3 int, \
c4 bigint, c5 tinyint unsigned, c6 smallint unsigned, c7 int unsigned, c8 bigint unsigned, c9 float, c10 double, c11 bool,c12 binary(20),c13 nchar(20)) tags(t0 int) ')
tdSql.execute(f'create table {tbname} using {stbname} tags(1)')
tdSql.execute(f'insert into {tbname} values (now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")')
tdSql.execute(f'alter table {stbname} add column c14 int')
tdSql.query(f'select c14 from {stbname}')
tdSql.checkRows(1)
tdSql.execute(f'alter table {stbname} add column `c15` int')
tdSql.query(f'select c15 from {stbname}')
tdSql.checkRows(1)
tdSql.query(f'describe {stbname}')
tdSql.checkRows(17)
tdSql.execute(f'alter table {stbname} drop column c14')
tdSql.query(f'describe {stbname}')
tdSql.checkRows(16)
tdSql.execute(f'alter table {stbname} drop column `c15`')
tdSql.query(f'describe {stbname}')
tdSql.checkRows(15)
tdSql.execute(f'alter table {stbname} modify column c12 binary(30)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(12,2,30)
tdSql.execute(f'alter table {stbname} modify column `c12` binary(35)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(12,2,35)
tdSql.error(f'alter table {stbname} modify column `c12` binary(34)')
tdSql.execute(f'alter table {stbname} modify column c13 nchar(30)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(13,2,30)
tdSql.error(f'alter table {stbname} modify column c13 nchar(29)')
tdSql.error(f'alter table {stbname} rename column c1 c21')
tdSql.error(f'alter table {stbname} modify column c1 int')
tdSql.error(f'alter table {stbname} modify column c4 int')
tdSql.error(f'alter table {stbname} modify column c8 int')
tdSql.error(f'alter table {stbname} modify column c1 unsigned int')
tdSql.error(f'alter table {stbname} modify column c9 double')
tdSql.error(f'alter table {stbname} modify column c10 float')
tdSql.error(f'alter table {stbname} modify column c11 int')
tdSql.execute(f'drop database {dbname}')
def alter_stb_tag_check(self):
dbname = self.get_long_name(length=10, mode="letters")
tdSql.execute(f'create database if not exists {dbname}')
stbname = self.get_long_name(length=3, mode="letters")
tbname = self.get_long_name(length=3, mode="letters")
tdSql.execute(f'create database if not exists {dbname}')
tdSql.execute(f'use {dbname}')
tdSql.execute(
f'create table {stbname} (ts timestamp, c1 int) tags(ts_tag timestamp, t1 tinyint, t2 smallint, t3 int, \
t4 bigint, t5 tinyint unsigned, t6 smallint unsigned, t7 int unsigned, t8 bigint unsigned, t9 float, t10 double, t11 bool,t12 binary(20),t13 nchar(20)) ')
tdSql.execute(f'create table {tbname} using {stbname} tags(now,1,2,3,4,5,6,7,8,9.9,10.1,true,"abcd","涛思数据")')
tdSql.execute(f'insert into {tbname} values(now,1)')
tdSql.execute(f'alter table {stbname} add tag t14 int')
tdSql.query(f'select t14 from {stbname}')
tdSql.checkRows(1)
tdSql.execute(f'alter table {stbname} add tag `t15` int')
tdSql.query(f'select t14 from {stbname}')
tdSql.checkRows(1)
tdSql.query(f'describe {stbname}')
tdSql.checkRows(18)
tdSql.execute(f'alter table {stbname} drop tag t14')
tdSql.query(f'describe {stbname}')
tdSql.checkRows(17)
tdSql.execute(f'alter table {stbname} drop tag `t15`')
tdSql.query(f'describe {stbname}')
tdSql.checkRows(16)
tdSql.execute(f'alter table {stbname} modify tag t12 binary(30)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(14,2,30)
tdSql.execute(f'alter table {stbname} modify tag `t12` binary(35)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(14,2,35)
tdSql.error(f'alter table {stbname} modify tag `t12` binary(34)')
tdSql.execute(f'alter table {stbname} modify tag t13 nchar(30)')
tdSql.query(f'describe {stbname}')
tdSql.checkData(15,2,30)
tdSql.error(f'alter table {stbname} modify tag t13 nchar(29)')
tdSql.execute(f'alter table {stbname} rename tag t1 t21')
tdSql.query(f'describe {stbname}')
tdSql.checkData(3,0,'t21')
tdSql.execute(f'alter table {stbname} rename tag `t21` t1')
tdSql.query(f'describe {stbname}')
tdSql.checkData(3,0,'t1')
for i in ['bigint','unsigned int','float','double','binary(10)','nchar(10)']:
for j in [1,2,3]:
tdSql.error(f'alter table {stbname} modify tag t{j} {i}')
for i in ['int','unsigned int','float','binary(10)','nchar(10)']:
tdSql.error(f'alter table {stbname} modify tag t8 {i}')
tdSql.error(f'alter table {stbname} modify tag t4 int')
tdSql.execute(f'drop database {dbname}')
def run(self):
self.alter_tb_tag_check()
self.alter_ntb_column_check()
self.alter_stb_column_check()
self.alter_stb_tag_check()
def stop(self):
tdSql.close()
......
......@@ -3,7 +3,7 @@ from time import sleep
from util.log import *
from util.sql import *
from util.cases import *
import os
......@@ -12,28 +12,47 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self): # sourcery skip: extract-duplicate-method
tdSql.prepare()
# get system timezone
today_date = datetime.datetime.strptime(
datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d")
self.rowNum = 10
self.ts = 1640966400000 # 2022-1-1 00:00:00.000
def check_customize_param_ms(self):
time_zone = os.popen('date "+%z"').read().strip()
tdSql.execute('create database db1 precision "ms"')
tdSql.execute('use db1')
tdSql.execute('create table if not exists ntb(ts timestamp, c1 int, c2 timestamp)')
for i in range(self.rowNum):
tdSql.execute("insert into ntb values(%d, %d, %d)"
% (self.ts + i, i + 1, self.ts + i))
tdSql.query('select to_iso8601(ts) from ntb')
for i in range(self.rowNum):
tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}')
timezone_list = ['+0000','+0100','+0200','+0300','+0330','+0400','+0500','+0530','+0600','+0700','+0800','+0900','+1000','+1100','+1200',\
'+00','+01','+02','+03','+04','+05','+06','+07','+08','+09','+10','+11','+12',\
'+00:00','+01:00','+02:00','+03:00','+03:30','+04:00','+05:00','+05:30','+06:00','+07:00','+08:00','+09:00','+10:00','+11:00','+12:00',\
'-0000','-0100','-0200','-0300','-0400','-0500','-0600','-0700','-0800','-0900','-1000','-1100','-1200',\
'-00','-01','-02','-03','-04','-05','-06','-07','-08','-09','-10','-11','-12',\
'-00:00','-01:00','-02:00','-03:00','-04:00','-05:00','-06:00','-07:00','-08:00','-09:00','-10:00','-11:00','-12:00',\
'z','Z']
for j in timezone_list:
tdSql.query(f'select to_iso8601(ts,"{j}") from ntb')
for i in range(self.rowNum):
tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{j}')
error_param_list = [0,100.5,'a','!']
for i in error_param_list:
tdSql.error(f'select to_iso8601(ts,"{i}") from ntb')
#! bug TD-16372:对于错误的时区,缺少校验
error_timezone_param = ['+13','-13','+1300','-1300','+0001','-0001','-0330','-0530']
for i in error_timezone_param:
tdSql.error(f'select to_iso8601(ts,"{i}") from ntb')
def check_base_function(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create tables==========")
tdSql.execute(
'''create table if not exists ntb
(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp)
'''
)
tdSql.execute(
'''create table if not exists stb
(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) tags(t0 int)
'''
)
tdSql.execute(
'''create table if not exists stb_1 using stb tags(100)
'''
)
tdSql.execute('create table if not exists ntb(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp)')
tdSql.execute('create table if not exists stb(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) tags(t0 int)')
tdSql.execute('create table if not exists stb_1 using stb tags(100)')
tdLog.printNoPrefix("==========step2:insert data==========")
tdSql.execute('insert into ntb values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())')
......@@ -48,12 +67,9 @@ class TDTestCase:
tdSql.checkRows(1)
tdSql.query("select to_iso8601(ts) from ntb where ts=today()")
tdSql.checkRows(1)
# tdSql.checkData(0,0,10)
for i in range(1,10):
for i in range(0,3):
tdSql.query("select to_iso8601(1) from ntb")
tdSql.checkData(0,0,"1970-01-01T08:00:01+0800")
i+=1
sleep(0.2)
tdSql.checkData(i,0,"1970-01-01T08:00:01+0800")
tdSql.checkRows(3)
tdSql.query("select to_iso8601(ts) from ntb")
tdSql.checkRows(3)
......@@ -67,53 +83,34 @@ class TDTestCase:
tdSql.error("select to_iso8601(timezone()) from ntb")
tdSql.error("select to_iso8601('abc') from ntb")
tdSql.query("select to_iso8601(today()) *null from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) +null from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) -null from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) /null from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) *null from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) +null from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) -null from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) /null from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
# tdSql.query("select to_iso8601(-1) from ntb")
for i in ['+','-','*','/']:
tdSql.query(f"select to_iso8601(today()) {i}null from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query(f"select to_iso8601(today()) {i}null from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(9223372036854775807) from ntb")
tdSql.checkRows(3)
# bug TD-14896
# bug TD-15207
# tdSql.query("select to_iso8601(10000000000) from ntb")
# tdSql.checkData(0,0,None)
# tdSql.query("select to_iso8601(-1) from ntb")
# tdSql.checkRows(3)
# tdSql.query("select to_iso8601(-10000000000) from ntb")
# tdSql.checkData(0,0,None)
tdSql.error("select to_iso8601(1.5) from ntb")
tdSql.error("select to_iso8601(1.5) from db.ntb")
tdSql.error("select to_iso8601('a') from ntb")
tdSql.error("select to_iso8601(c2) from ntb")
err_param = [1.5,'a','c2']
for i in err_param:
tdSql.error(f"select to_iso8601({i}) from ntb")
tdSql.error(f"select to_iso8601({i}) from db.ntb")
tdSql.query("select to_iso8601(now) from stb")
tdSql.checkRows(3)
tdSql.query("select to_iso8601(now()) from stb")
tdSql.checkRows(3)
for i in range(1,10):
tdSql.query("select to_iso8601(1) from stb")
tdSql.checkData(0,0,"1970-01-01T08:00:01+0800")
i+=1
sleep(0.2)
tdSql.query("select to_iso8601(1) from stb")
for i in range(0,3):
tdSql.checkData(i,0,"1970-01-01T08:00:01+0800")
tdSql.checkRows(3)
tdSql.query("select to_iso8601(ts) from stb")
tdSql.checkRows(3)
......@@ -121,37 +118,17 @@ class TDTestCase:
tdSql.checkRows(3)
tdSql.query("select to_iso8601(ts)+'a' from stb ")
tdSql.checkRows(3)
for i in ['+','-','*','/']:
tdSql.query(f"select to_iso8601(today()) {i}null from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query(f"select to_iso8601(today()) {i}null from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) *null from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) +null from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) -null from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) /null from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) *null from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) +null from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) -null from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.query("select to_iso8601(today()) /null from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
# bug TD-14896
# tdSql.query("select to_iso8601(-1) from ntb")
# tdSql.checkRows(3)
def run(self): # sourcery skip: extract-duplicate-method
self.check_base_function()
self.check_customize_param_ms()
def stop(self):
tdSql.close()
......
......@@ -65,6 +65,60 @@ class TDTestCase:
'''
)
def prepare_tag_datas(self):
# prepare datas
tdSql.execute("create database if not exists testdb keep 3650 days 1000")
tdSql.execute(" use testdb ")
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t0 timestamp, t1 int, t2 bigint, t3 smallint, t4 tinyint, t5 float, t6 double, t7 bool, t8 binary(16),t9 nchar(32))
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( now(), {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, "binary{i}", "nchar{i}" )')
for i in range(9):
tdSql.execute(
f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
)
tdSql.execute(
f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )"
)
tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )")
tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )")
tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )")
tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ")
tdSql.execute(
f'''insert into t1 values
( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
'''
)
def check_result_auto(self ,origin_query , abs_query):
abs_result = tdSql.getResult(abs_query)
origin_result = tdSql.getResult(origin_query)
......@@ -95,6 +149,7 @@ class TDTestCase:
tdLog.info("abs value check pass , it work as expected ,sql is \"%s\" "%abs_query )
def test_errors(self):
tdSql.execute("use testdb")
error_sql_lists = [
"select abs from t1",
# "select abs(-+--+c1) from t1",
......@@ -129,11 +184,16 @@ class TDTestCase:
tdSql.error(error_sql)
def support_types(self):
tdSql.execute("use testdb")
type_error_sql_lists = [
"select abs(ts) from t1" ,
"select abs(t0) from t1" ,
"select abs(c7) from t1",
"select abs(c8) from t1",
"select abs(c9) from t1",
"select abs(t7) from t1",
"select abs(t8) from t1",
"select abs(t9) from t1",
"select abs(ts) from ct1" ,
"select abs(c7) from ct1",
"select abs(c8) from ct1",
......@@ -171,6 +231,13 @@ class TDTestCase:
"select abs(c5) from t1",
"select abs(c6) from t1",
"select abs(t1) from ct1",
"select abs(t2) from ct1",
"select abs(t3) from ct1",
"select abs(t4) from ct1",
"select abs(t5) from ct1",
"select abs(t6) from ct1",
"select abs(c1) from ct1",
"select abs(c2) from ct1",
"select abs(c3) from ct1",
......@@ -447,6 +514,38 @@ class TDTestCase:
self.check_result_auto("select c1+1 ,c2 , c3*1 , c4/2, c5/2, c6 from sub1_bound" ,"select abs(c1+1) ,abs(c2) , abs(c3*1) , abs(c4/2), abs(c5)/2, abs(c6) from sub1_bound ")
def test_tag_compute_for_scalar_function(self):
tdSql.execute("use testdb")
self.check_result_auto( "select c1, t2, t3 , t4, t5 from ct4 ", "select (c1), abs(t2) ,abs(t3), abs(t4), abs(t5) from ct4")
self.check_result_auto( "select c1+2, t2+2, t3 , t4, t5 from ct4 ", "select (c1)+2, abs(t2)+2 ,abs(t3), abs(t4), abs(t5) from ct4")
self.check_result_auto( "select c1+2, t2+2, t3 , t4, t5 from stb1 order by t1 ", "select (c1)+2, abs(t2)+2 ,abs(t3), abs(t4), abs(t5) from stb1 order by t1")
# bug need fix
# tdSql.query(" select sum(c1) from stb1 where t1+10 >1; ") # taosd crash
tdSql.query("select c1 ,t1 from stb1 where t1 =0 ")
tdSql.checkRows(13)
# tdSql.query("select t1 from stb1 where t1 >0 ")
# tdSql.checkRows(3)
# tdSql.query("select sum(t1) from (select c1 ,t1 from stb1)")
# tdSql.checkData(0,0,61)
# tdSql.query("select distinct(c1) ,t1 from stb1")
# tdSql.checkRows(11)
# tdSql.query("select max(t2) , t1 ,c1, t2 from stb1")
# tdSql.checkData(0,3,33333)
# tag filter with abs function
# tdSql.query("select t1 from stb1 where abs(t1)=1")
# tdSql.checkRows(1)
tdSql.query("select t1 from stb1 where abs(c1+t1)=1")
tdSql.checkRows(1)
# tdSql.query("select t1 from stb1 where abs(t1+c1)=1")
# tdSql.checkRows(1)
tdSql.query("select abs(c1+t1)*t1 from stb1 where abs(c1)/floor(abs(ceil(t1))) ==1")
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
......@@ -454,6 +553,7 @@ class TDTestCase:
tdLog.printNoPrefix("==========step1:create table ==============")
self.prepare_datas()
self.prepare_tag_datas()
tdLog.printNoPrefix("==========step2:test errors ==============")
......@@ -475,6 +575,10 @@ class TDTestCase:
self.abs_func_filter()
tdLog.printNoPrefix("==========step6: tag coumpute query ============")
self.test_tag_compute_for_scalar_function()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
......@@ -98,7 +98,7 @@ python3 ./test.py -f 2-query/statecount.py
python3 ./test.py -f 7-tmq/basic5.py
python3 ./test.py -f 7-tmq/subscribeDb.py
python3 ./test.py -f 7-tmq/subscribeDb0.py
#python3 ./test.py -f 7-tmq/subscribeDb1.py
python3 ./test.py -f 7-tmq/subscribeDb1.py
python3 ./test.py -f 7-tmq/subscribeStb.py
python3 ./test.py -f 7-tmq/subscribeStb0.py
python3 ./test.py -f 7-tmq/subscribeStb1.py
......
Subproject commit 932da0f4cac013c2eded824d1d4d01cfa6168fa3
Subproject commit 717f5aaa5f0a1b4d92bb2ae68858fec554fb5eda
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册