提交 ead04964 编写于 作者: H Haojun Liao

Merge branch 'main' into fix/liaohj

cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE ON)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
......
......@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.1.0.1.alpha")
SET(TD_VER_NUMBER "3.1.0.2.alpha")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -52,7 +52,7 @@ CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);
### Create a Stream
```sql
create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s);
create stream current_stream trigger at_once into current_stream_output_stb as select _wstart as wstart, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);
```
### Write Data
......
......@@ -178,6 +178,7 @@ The following list shows all reserved keywords:
- MATCH
- MAX_DELAY
- MAX_SPEED
- MAXROWS
- MERGE
- META
......
......@@ -81,6 +81,14 @@ For example:
taos -h h1.taos.com -s "use db; show tables;"
```
## Export query results to a file
- You can use ">>" to export the query results to a file, the syntax is like `select * from table >> file`. If there is only file name without path, the file will be generated under the current working directory of TDegnine CLI.
## Import data from CSV file
- You can use `insert into table_name file 'fileName'` to import the data from the specified file into the specified table. For example, `insert into d0 file '/root/d0.csv';` means importing the data in file "/root/d0.csv" into table "d0". If there is only file name without path, that means the file is located under current working directory of TDengine CLI.
## TDengine CLI tips
- You can use the up and down keys to iterate the history of commands entered
......@@ -89,3 +97,5 @@ taos -h h1.taos.com -s "use db; show tables;"
- Execute `RESET QUERY CACHE` to clear the local cache of the table schema
- Execute SQL statements in batches. You can store a series of shell commands (ending with ;, one line for each SQL command) in a script file and execute the command `source <file-name>` in the TDengine CLI to execute all SQL commands in that file automatically
- Enter `q` to exit TDengine CLI
......@@ -178,6 +178,7 @@ description: TDengine 保留关键字的详细列表
- MATCH
- MAX_DELAY
- MAX_SPEED
- MAXROWS
- MERGE
- META
......
......@@ -89,3 +89,11 @@ taos -h h1.taos.com -s "use db; show tables;"
- 执行 `RESET QUERY CACHE` 可清除本地表 Schema 的缓存
- 批量执行 SQL 语句。可以将一系列的 TDengine CLI 命令(以英文 ; 结尾,每个 SQL 语句为一行)按行存放在文件里,在 TDengine CLI 里执行命令 `source <file-name>` 自动执行该文件里所有的 SQL 语句
- 输入 `q``quit``exit` 回车,可以退出 TDengine CLI
## TDengine CLI 导出查询结果到文件中
- 可以使用符号 “>>” 导出查询结果到某个文件中,语法为: sql 查询语句 >> ‘输出文件名’; 输出文件如果不写路径的话,将输出至当前目录下。如 select * from d0 >> ‘/root/d0.csv’; 将把查询结果输出到 /root/d0.csv 中。
## TDengine CLI 导入文件中的数据到表中
- 可以使用 insert into table_name file '输入文件名',把上一步中导出的数据文件再导入到指定表中。如 insert into d0 file '/root/d0.csv'; 表示把上面导出的数据全部再导致至 d0 表中。
......@@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return 0;
}
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.beginOffset = *reqOffset;
if(hasData) pVg->offsetInfo.beginOffset = *reqOffset;
pVg->offsetInfo.endOffset = *rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
......@@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset;
}
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId, pDataRsp->blockNum != 0);
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
......@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL;
}
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
// build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
......@@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL;
}
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId, pollRspWrapper->taosxRsp.blockNum != 0);
if (pollRspWrapper->taosxRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
......
......@@ -244,7 +244,7 @@ static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, con
SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
}
taosArrayDestroy(pConsumerEp->vgs);
......
......@@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
#ifdef __cplusplus
}
#endif
......
......@@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
}
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req);
tqInitDataRsp(&dataRsp, req.reqOffset);
dataRsp.blockNum = 0;
dataRsp.rspOffset = dataRsp.reqOffset;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId);
......@@ -391,7 +390,6 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
}
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
......@@ -715,7 +713,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req);
tqInitDataRsp(&dataRsp, req.reqOffset);
if (req.useSnapshot == true) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
......
......@@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
if(buildHandle(pTq, handle) < 0){
return -1;
}
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
tqInfo("restoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}
......@@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
if(buildHandle(pTq, handle) < 0){
return -1;
}
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}
......
......@@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
taosRUnLockLatch(&pTq->pStreamMeta->lock);
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
tqTrace("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
// push data for stream processing:
// 1. the vnode has already been restored.
......
......@@ -20,8 +20,9 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
pRsp->reqOffset = pReq->reqOffset;
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
......@@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
return 0;
}
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
pRsp->reqOffset = pReq->reqOffset;
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->withTbName = 1;
pRsp->withSchema = 1;
......@@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) {
uint64_t consumerId = pRequest->consumerId;
STqOffsetVal reqOffset = pRequest->reqOffset;
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
int32_t vgId = TD_VID(pTq->pVnode);
......@@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
} else {
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
if (pRequest->useSnapshot) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
consumerId, pHandle->subKey, vgId);
......@@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
} else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1);
tqInitDataRsp(&dataRsp, *pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
......@@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
*pBlockReturned = true;
return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
} else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, vgId, pRequest->subKey);
......@@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
if(offset->type == TMQ_OFFSET__LOG){
offset->version = ver + 1;
}
}
//static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
// if(offset->type == TMQ_OFFSET__LOG){
// offset->version = ver;
// }
//}
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
......@@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno = 0;
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset
tqInitDataRsp(&dataRsp, *pOffset);
// dataRsp.reqOffset.type = pOffset->type; // store origin type for getting offset in tmq_get_vgroup_offset
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
......@@ -152,8 +153,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// lock
taosWLockLatch(&pTq->lock);
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pOffset->version >= ver ||
dataRsp.rspOffset.version >= ver) { // check if there are data again to avoid lost data
if (dataRsp.rspOffset.version > ver) { // check if there are data again to avoid lost data
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
goto end;
......@@ -161,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock);
}
setRequestVersion(&dataRsp.reqOffset, pOffset->version);
// setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {
......@@ -182,8 +182,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SWalCkHead* pCkHead = NULL;
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
tqInitTaosxRsp(&taosxRsp, *offset);
// taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
......@@ -236,7 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
......@@ -249,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
......@@ -279,7 +279,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
} else {
......@@ -296,15 +296,13 @@ end:
}
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
int32_t code = -1;
STqOffsetVal offset = {0};
STqOffsetVal reqOffset = pRequest->reqOffset;
// 1. reset the offset if needed
if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
// handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false;
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) {
return code;
}
......@@ -313,20 +311,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
if (blockReturned) {
return 0;
}
} else if(reqOffset.type != 0){ // use the consumer specified offset
// the offset value can not be monotonious increase??
offset = reqOffset;
} else {
} else if(reqOffset.type == 0){ // use the consumer specified offset
uError("req offset type is 0");
return TSDB_CODE_TMQ_INVALID_MSG;
}
// this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
} else { // todo handle the case where re-balance occurs.
// for taosx
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
}
}
......
......@@ -392,6 +392,9 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
code = tTombBlockPut(reader->tombBlock, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(reader->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
break;
}
......
......@@ -628,6 +628,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_SEEK:
return tqProcessSeekReq(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
......
......@@ -624,7 +624,7 @@ void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg*
}
}
if (nSma < pCfg->numOfColumns) {
if (nSma < pCfg->numOfColumns && nSma > 0) {
bool smaOn = false;
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " SMA(");
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
......
......@@ -848,6 +848,10 @@ static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
bool ignoreNull = getIgoreNullRes(pSup);
int32_t order = TSDB_ORDER_ASC;
if (checkWindowBoundReached(pSliceInfo)) {
return;
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
......
......@@ -1520,6 +1520,10 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData);
pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows);
cleanupExprSupp(&pInfo->scalarSupp);
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
taosMemoryFreeClear(param);
}
......
......@@ -20,6 +20,7 @@
#include "tutil.h"
#include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
}
......
......@@ -70,25 +70,16 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t fetchVer = pReader->curVersion;
int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal);
// int64_t appliedVer = walGetAppliedVer(pReader->pWal);
// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
// }
// int64_t endVer = TMIN(appliedVer, committedVer);
int64_t endVer = committedVer;
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer);
if (fetchVer > endVer){
", applied index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
if (fetchVer > appliedVer){
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
while (fetchVer <= endVer) {
while (fetchVer <= appliedVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1;
}
......
......@@ -126,6 +126,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
......@@ -452,7 +453,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
#,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
......@@ -794,9 +795,10 @@
,,y,script,./test.sh -f tsim/user/basic.sim
,,y,script,./test.sh -f tsim/user/password.sim
,,y,script,./test.sh -f tsim/user/privilege_db.sim
#,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
,,y,script,./test.sh -f tsim/user/privilege_topic.sim
,,y,script,./test.sh -f tsim/user/privilege_table.sim
,,y,script,./test.sh -f tsim/user/privilege_create_db.sim
,,y,script,./test.sh -f tsim/db/alter_option.sim
,,y,script,./test.sh -f tsim/db/alter_replica_31.sim
,,y,script,./test.sh -f tsim/db/basic1.sim
......@@ -969,6 +971,7 @@
,,y,script,./test.sh -f tsim/query/tag_scan.sim
,,y,script,./test.sh -f tsim/query/nullColSma.sim
,,y,script,./test.sh -f tsim/query/bug3398.sim
,,y,script,./test.sh -f tsim/query/explain_tsorder.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim
......
......@@ -30,7 +30,15 @@ class TDTestCase:
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300;use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
select avg(c1) from deldata.ct1;
delete from deldata.stb1;
flush database deldata;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
delete from deldata.ct1;'''
def checkProcessPid(self,processName):
i=0
while i<60:
......@@ -138,6 +146,8 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database test '")
# os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
......@@ -151,6 +161,10 @@ class TDTestCase:
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
# add deleted data
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "{self.deletedDataSql}" ')
cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}")
if os.system(cmd) == 0:
......@@ -185,11 +199,19 @@ class TDTestCase:
# tdsql.query("show streams;")
# tdsql.query(f"select count(*) from {stb}")
# tdsql.checkData(0,0,tableNumbers*recordNumbers2)
tdsql.query(f"select count(*) from db4096.stb0")
# checkout db4096
tdsql.query("select count(*) from db4096.stb0")
tdsql.checkData(0,0,50000)
# checkout deleted data
tdsql.execute("insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );")
tdsql.execute("flush database deldata;")
tdsql.query("select avg(c1) from deldata.ct1;")
tdsql=tdCom.newTdSql()
tdLog.printNoPrefix(f"==========step4:verify backticks in taos Sql-TD18542")
tdLog.printNoPrefix("==========step4:verify backticks in taos Sql-TD18542")
tdsql.execute("drop database if exists db")
tdsql.execute("create database db")
tdsql.execute("use db")
......@@ -203,6 +225,8 @@ class TDTestCase:
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
tdsql.query("select * from db.ct4")
tdsql.checkData(0,1,14)
#check retentions
tdsql=tdCom.newTdSql()
tdsql.query("describe information_schema.ins_databases;")
qRows=tdsql.queryRows
......@@ -222,8 +246,12 @@ class TDTestCase:
caller = inspect.getframeinfo(inspect.stack()[0][0])
args = (caller.filename, caller.lineno)
tdLog.exit("%s(%d) failed" % args)
# check stream
tdsql.query("show streams;")
tdsql.checkRows(0)
#check TS-3131
tdsql.query("select *,tbname from d0.almlog where mcid='m0103';")
tdsql.checkRows(6)
expectList = [0,3003,20031,20032,20033,30031]
......@@ -238,6 +266,8 @@ class TDTestCase:
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
# check tmq
conn = taos.connect()
consumer = Consumer(
......@@ -265,6 +295,8 @@ class TDTestCase:
print(block.fetchall())
tdsql.query("show topics;")
tdsql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
drop database if exists deldata;
create database deldata duration 300;
use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
select avg(c1) from deldata.ct1;
delete from deldata.stb1;
flush database deldata;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
delete from deldata.ct1;
......@@ -20,6 +20,7 @@ class TDTestCase:
tbname = "tb"
tbname1 = "tb1"
tbname2 = "tb2"
tbname3 = "tb3"
stbname = "stb"
ctbname1 = "ctb1"
ctbname2 = "ctb2"
......@@ -5607,6 +5608,44 @@ class TDTestCase:
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_single} partition by tbname range('2020-02-01 00:00:06') fill(linear)")
tdSql.checkRows(0)
#### TS-3799 ####
tdSql.execute(
f'''create table if not exists {dbname}.{tbname3} (ts timestamp, c0 double)'''
)
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:51.000000000', 4.233947800000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:52.000000000', 3.606781000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:52.500000000', 3.162353500000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:53.000000000', 3.162292500000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:53.500000000', 4.998230000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:54.400000000', 8.800414999999999)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:54.900000000', 8.853271500000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:55.900000000', 7.507751500000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:56.400000000', 7.510681000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:56.900000000', 7.841614000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:57.900000000', 8.153809000000001)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:58.500000000', 6.866455000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:59.000000000', 6.869140600000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-07 00:00:00.000000000', 0.261475000000001)")
tdSql.query(f"select _irowts, interp(c0) from {dbname}.{tbname3} range('2023-08-06 23:59:00','2023-08-06 23:59:59') every(1m) fill(next)")
tdSql.checkRows(1);
tdSql.checkData(0, 0, '2023-08-06 23:59:00')
tdSql.checkData(0, 1, 4.233947800000000)
tdSql.query(f"select _irowts, interp(c0) from {dbname}.{tbname3} range('2023-08-06 23:59:00','2023-08-06 23:59:59') every(1m) fill(value, 1)")
tdSql.checkRows(1);
tdSql.checkData(0, 0, '2023-08-06 23:59:00')
tdSql.checkData(0, 1, 1)
tdSql.query(f"select _irowts, interp(c0) from {dbname}.{tbname3} range('2023-08-06 23:59:00','2023-08-06 23:59:59') every(1m) fill(null)")
tdSql.checkRows(1);
tdSql.checkData(0, 0, '2023-08-06 23:59:00')
tdSql.checkData(0, 1, None)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
......@@ -27,7 +27,7 @@ import threading
import time
import json
BASEVERSION = "3.0.7.0"
BASEVERSION = "3.1.0.0"
class TDTestCase:
......@@ -37,6 +37,15 @@ class TDTestCase:
tdSql.init(conn.cursor())
self.host = socket.gethostname()
self.replicaVar = int(replicaVar)
self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300;use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
select avg(c1) from deldata.ct1;
delete from deldata.stb1;
flush database deldata;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
delete from deldata.ct1;'''
def checkProcessPid(self,processName):
i=0
......@@ -245,6 +254,9 @@ class TDTestCase:
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
# self.buildTaosd(bPath)
# add deleted data
os.system(f'LD_LIBRARY_PATH=/usr/lib taos -s "{self.deletedDataSql}" ')
threads=[]
threads.append(threading.Thread(target=self.insertAllData, args=(cPath_temp,dbname,tableNumbers1,recordNumbers1)))
for tr in threads:
......@@ -285,6 +297,11 @@ class TDTestCase:
tdsql1.query(f"select count(*) from db4096.stb0")
tdsql1.checkData(0,0,50000)
# checkout deleted data
tdsql.execute("insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );")
tdsql.query("flush database deldata;select avg(c1) from deldata.ct1;")
# tdsql1.query("show streams;")
# tdsql1.checkRows(2)
tdsql1.query("select *,tbname from d0.almlog where mcid='m0103';")
......
......@@ -36,7 +36,7 @@ class TDTestCase:
# tdDnodes[1].cfgDir
cfgFile = f"%s/taos.cfg"%(cfgDir)
shellCmd = 'echo "tmqMaxTopicNum %d" >> %s'%(tmqMaxTopicNum, cfgFile)
shellCmd = 'echo tmqMaxTopicNum %d >> %s'%(tmqMaxTopicNum, cfgFile)
tdLog.info(" shell cmd: %s"%(shellCmd))
os.system(shellCmd)
tdDnodes.stoptaosd(1)
......
......@@ -131,7 +131,7 @@ class TDTestCase:
if snapshot_value == "true":
if offset_value != "earliest" and offset_value != "":
if offset_value == "latest":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info))
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
......@@ -154,7 +154,7 @@ class TDTestCase:
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info))
else:
if offset_value != "none":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info))
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
......
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
buildPath = tdCom.getBuildPath()
cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath)
tdLog.info(cmdStr1)
os.system(cmdStr1)
time.sleep(15)
cmdStr2 = '%s/build/bin/tmq_offset_test &'%(buildPath)
tdLog.info(cmdStr2)
os.system(cmdStr2)
time.sleep(20)
os.system("kill -9 `pgrep taosBenchmark`")
result = os.system("kill -9 `pgrep tmq_offset_test`")
if result != 0:
tdLog.exit("tmq_offset_test error!")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c)
add_executable(sml_test sml_test.c)
add_executable(get_db_name_test get_db_name_test.c)
add_executable(tmq_offset tmqOffset.c)
add_executable(tmq_offset_test tmq_offset_test.c)
target_link_libraries(
tmq_offset
PUBLIC taos
......@@ -42,6 +43,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_offset_test
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
write_raw_block_test
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
int buildData(TAOS* pConn){
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists tp");
if (taos_errno(pRes) != 0) {
printf("error in drop tp, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists db_ts3756");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_ts3756 vgroups 2 wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use db_ts3756");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)");
if (taos_errno(pRes) != 0) {
printf("failed to create table meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into t1 values(now, 1)");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into t1 values(now + 1s, 2)");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic tp as select * from t1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic tp, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
void test_offset(TAOS* pConn){
if(buildData(pConn) != 0){
ASSERT(0);
}
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "false");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "tp");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
int32_t timeout = 200;
tmq_topic_assignment* pAssign1 = NULL;
int32_t numOfAssign1 = 0;
tmq_topic_assignment* pAssign2 = NULL;
int32_t numOfAssign2 = 0;
tmq_topic_assignment* pAssign3 = NULL;
int32_t numOfAssign3 = 0;
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign1, &numOfAssign1);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign1);
tmq_consumer_close(tmq);
ASSERT(0);
}
code = tmq_get_topic_assignment(tmq, "tp", &pAssign2, &numOfAssign2);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign2);
tmq_consumer_close(tmq);
ASSERT(0);
}
code = tmq_get_topic_assignment(tmq, "tp", &pAssign3, &numOfAssign3);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign3);
tmq_consumer_close(tmq);
ASSERT(0);
return;
}
ASSERT(numOfAssign1 == 2);
ASSERT(numOfAssign1 == numOfAssign2);
ASSERT(numOfAssign1 == numOfAssign3);
for(int i = 0; i < numOfAssign1; i++){
int j = 0;
int k = 0;
for(; j < numOfAssign2; j++){
if(pAssign1[i].vgId == pAssign2[j].vgId){
break;
}
}
for(; k < numOfAssign3; k++){
if(pAssign1[i].vgId == pAssign3[k].vgId){
break;
}
}
ASSERT(pAssign1[i].currentOffset == pAssign2[j].currentOffset);
ASSERT(pAssign1[i].currentOffset == pAssign3[k].currentOffset);
ASSERT(pAssign1[i].begin == pAssign2[j].begin);
ASSERT(pAssign1[i].begin == pAssign3[k].begin);
ASSERT(pAssign1[i].end == pAssign2[j].end);
ASSERT(pAssign1[i].end == pAssign3[k].end);
}
tmq_free_assignment(pAssign1);
tmq_free_assignment(pAssign2);
tmq_free_assignment(pAssign3);
int cnt = 0;
int offset1 = -1;
int offset2 = -1;
while (cnt++ < 10) {
printf("start to poll:%d\n", cnt);
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
ASSERT(0);
}
for(int i = 0; i < numOfAssign; i++){
int64_t position = tmq_position(tmq, "tp", pAssign[i].vgId);
if(position == 0) continue;
printf("position = %d\n", (int)position);
tmq_commit_offset_sync(tmq, "tp", pAssign[i].vgId, position);
int64_t committed = tmq_committed(tmq, "tp", pAssign[i].vgId);
ASSERT(position == committed);
}
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
if(offset1 != -1){
ASSERT(offset1 == pAssign[0].currentOffset);
}
if(offset2 != -1){
ASSERT(offset2 == pAssign[1].currentOffset);
}
offset1 = pAssign[0].currentOffset;
offset2 = pAssign[1].currentOffset;
tmq_free_assignment(pAssign);
taos_free_result(pRes);
}
}
tmq_consumer_close(tmq);
}
// run taosBenchmark first
void test_ts3756(TAOS* pConn){
TAOS_RES*pRes = taos_query(pConn, "use test");
if (taos_errno(pRes) != 0) {
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists t1");
if (taos_errno(pRes) != 0) {
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic t1 as select * from meters");
if (taos_errno(pRes) != 0) {
ASSERT(0);
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "latest");
tmq_conf_set(conf, "msg.with.table.name", "false");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
int32_t timeout = 200;
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
while (1) {
// printf("start to poll\n");
pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
tmq_topic_assignment* pAssignTmp = NULL;
int32_t numOfAssignTmp = 0;
int32_t code = tmq_get_topic_assignment(tmq, "t1", &pAssignTmp, &numOfAssignTmp);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
ASSERT(0);
}
if(numOfAssign != 0){
int i = 0;
for(; i < numOfAssign; i++){
if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){
break;
}
}
if(i == numOfAssign){
ASSERT(0);
}
tmq_free_assignment(pAssign);
}
numOfAssign = numOfAssignTmp;
pAssign = pAssignTmp;
taos_free_result(pRes);
}
}
tmq_free_assignment(pAssign);
}
int main(int argc, char* argv[]) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
test_offset(pConn);
test_ts3756(pConn);
taos_close(pConn);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册