提交 49757faf 编写于 作者: K kailixu

Merge branch '3.0' into enh/TD-25528-3.0x

......@@ -62,12 +62,13 @@ serverPort 6030
For all the dnodes in a TDengine cluster, the below parameters must be configured exactly the same, any node whose configuration is different from dnodes already in the cluster can't join the cluster.
| **#** | **Parameter** | **Definition** |
| ----- | ------------------ | ------------------------------------------- |
| 1 | statusInterval | The interval by which dnode reports its status to mnode |
| 2 | timezone | Timezone |
| 3 | locale | System region and encoding |
| 4 | charset | Character set |
| **#** | **Parameter** | **Definition** |
| ----- | ---------------- | ----------------------------------------------------------------------------- |
| 1 | statusInterval | The interval by which dnode reports its status to mnode |
| 2 | timezone | Timezone |
| 3 | locale | System region and encoding |
| 4 | charset | Character set |
| 5 | ttlChangeOnWrite | Whether the ttl expiration time changes with the table modification operation |
## Start Cluster
......@@ -97,7 +98,7 @@ Then, on the first dnode i.e. h1.tdengine.com in our example, use TDengine CLI `
CREATE DNODE "h2.taos.com:6030";
````
This adds the end point of the new dnode (from Step 4) into the end point list of the cluster. In the command "fqdn:port" should be quoted using double quotes. Change `"h2.taos.com:6030"` to the end point of your new dnode.
This adds the end point of the new dnode (from Step 4) into the end point list of the cluster. In the command "fqdn:port" should be quoted using double quotes. Change `"h2.taos.com:6030"` to the end point of your new dnode.
Then on the first dnode h1.tdengine.com, execute `show dnodes` in `taos`
......
......@@ -698,7 +698,7 @@ ELAPSED(ts_primary_key [, time_unit])
LEASTSQUARES(expr, start_val, step_val)
```
**Description**: The linear regression function of the specified column and the timestamp column (primary key), `start_val` is the initial value and `step_val` is the step value.
**Description**: The linear regression function of a specified column, `start_val` is the initial value and `step_val` is the step value.
**Return value type**: A string in the format of "(slope, intercept)"
......
......@@ -62,12 +62,13 @@ serverPort 6030
加入到集群中的数据节点 dnode,下表中涉及集群相关的参数必须完全相同,否则不能成功加入到集群中。
| **#** | **配置参数名称** | **含义** |
| ----- | ------------------ | ------------------------------------------- |
| 1 | statusInterval | dnode 向 mnode 报告状态时长 |
| 2 | timezone | 时区 |
| 3 | locale | 系统区位信息及编码格式 |
| 4 | charset | 字符集编码 |
| **#** | **配置参数名称** | **含义** |
| ----- | ---------------- | ------------------------------------ |
| 1 | statusInterval | dnode 向 mnode 报告状态时长 |
| 2 | timezone | 时区 |
| 3 | locale | 系统区位信息及编码格式 |
| 4 | charset | 字符集编码 |
| 5 | ttlChangeOnWrite | ttl 到期时间是否伴随表的修改操作改变 |
## 启动集群
......@@ -196,10 +197,10 @@ dnodeID 是集群自动分配的,不得人工指定。它在生成时是递增
1、建立集群时使用 CREATE DNODE 增加新节点后,新节点始终显示 offline 状态?
```sql
1)首先要检查增加的新节点上的 taosd 服务是否已经正常启动
2)如果已经启动,再检查到新节点的网络是否通畅,可以使用 ping fqdn 验证下
3)如果前面两步都没有问题,这一步要检查新节点做为独立集群在运行了,可以使用 taos -h fqdn 连接上后,show dnodes; 命令查看.
如果显示的列表与你主节点上显示的不一致,说明此节点自己单独成立了一个集群,解决的方法是停止新节点上的服务,然后清空新节点上
如果显示的列表与你主节点上显示的不一致,说明此节点自己单独成立了一个集群,解决的方法是停止新节点上的服务,然后清空新节点上
taos.cfg 中配置的 dataDir 目录下的所有文件,重新启动新节点服务即可解决。
```
```
......@@ -700,7 +700,7 @@ ELAPSED(ts_primary_key [, time_unit])
LEASTSQUARES(expr, start_val, step_val)
```
**功能说明**:统计表中某列的值是主键(时间戳)的拟合直线方程。start_val 是自变量初始值,step_val 是自变量的步长值。
**功能说明**:统计表中某列的值的拟合直线方程。start_val 是自变量初始值,step_val 是自变量的步长值。
**返回数据类型**:字符串表达式(斜率, 截距)。
......
......@@ -153,7 +153,6 @@ struct SWalReader {
int64_t capacity;
TdThreadMutex mutex;
SWalFilterCond cond;
// TODO remove it
SWalCkHead *pHead;
};
......@@ -207,10 +206,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
// only for tq usage
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
int32_t walFetchHead(SWalReader *pRead, int64_t ver);
int32_t walFetchBody(SWalReader *pRead);
int32_t walSkipFetchBody(SWalReader *pRead);
void walRefFirstVer(SWal *, SWalRef *);
void walRefLastVer(SWal *, SWalRef *);
......
......@@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return 0;
}
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.beginOffset = *reqOffset;
if(hasData) pVg->offsetInfo.beginOffset = *reqOffset;
pVg->offsetInfo.endOffset = *rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
......@@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset;
}
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId, pDataRsp->blockNum != 0);
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
......@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL;
}
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
// build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
......@@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL;
}
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId, pollRspWrapper->taosxRsp.blockNum != 0);
if (pollRspWrapper->taosxRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
......
......@@ -1442,178 +1442,4 @@ TEST(clientCase, sub_tb_mt_test) {
}
}
TEST(clientCase, ts_3756) {
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "latest");
tmq_conf_set(conf, "msg.with.table.name", "false");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "tp");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
TAOS_FIELD* fields = NULL;
int32_t numOfFields = 0;
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 200;
int32_t count = 0;
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
while (1) {
printf("start to poll\n");
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[128];
const char* topicName = tmq_get_topic_name(pRes);
// const char* dbName = tmq_get_db_name(pRes);
// int32_t vgroupId = tmq_get_vgroup_id(pRes);
//
// printf("topic: %s\n", topicName);
// printf("db: %s\n", dbName);
// printf("vgroup id: %d\n", vgroupId);
printSubResults(pRes, &totalRows);
tmq_topic_assignment* pAssignTmp = NULL;
int32_t numOfAssignTmp = 0;
code = tmq_get_topic_assignment(tmq, "tp", &pAssignTmp, &numOfAssignTmp);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssignTmp[i].vgId, pAssignTmp[i].currentOffset, pAssignTmp[i].begin, pAssignTmp[i].end);
}
if(numOfAssign != 0){
int i = 0;
for(; i < numOfAssign; i++){
if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){
break;
}
}
if(i == numOfAssign){
printf("all position is same\n");
break;
}
tmq_free_assignment(pAssign);
}
numOfAssign = numOfAssignTmp;
pAssign = pAssignTmp;
} else {
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
// tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
// tmq_commit_sync(tmq, pRes);
continue;
}
// tmq_commit_sync(tmq, pRes);
if (pRes != NULL) {
taos_free_result(pRes);
// if ((++count) > 1) {
// break;
// }
} else {
// break;
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
}
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
#pragma GCC diagnostic pop
......@@ -244,7 +244,7 @@ static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, con
SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
}
taosArrayDestroy(pConsumerEp->vgs);
......
......@@ -127,7 +127,7 @@ void tqDestroyTqHandle(void* data);
// tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
......@@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
#ifdef __cplusplus
}
#endif
......
......@@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
}
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req);
tqInitDataRsp(&dataRsp, req.reqOffset);
dataRsp.blockNum = 0;
dataRsp.rspOffset = dataRsp.reqOffset;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId);
......@@ -391,7 +390,6 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
}
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
......@@ -719,7 +717,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req);
tqInitDataRsp(&dataRsp, req.reqOffset);
if (req.useSnapshot == true) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
......
......@@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
if(buildHandle(pTq, handle) < 0){
return -1;
}
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
tqInfo("restoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}
......@@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
if(buildHandle(pTq, handle) < 0){
return -1;
}
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}
......
......@@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
taosRUnLockLatch(&pTq->pStreamMeta->lock);
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
tqTrace("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
// push data for stream processing:
// 1. the vnode has already been restored.
......
......@@ -184,70 +184,63 @@ end:
return tbSuid == realTbSuid;
}
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
int32_t code = 0;
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
int32_t code = -1;
int32_t vgId = TD_VID(pTq->pVnode);
taosThreadMutexLock(&pHandle->pWalReader->mutex);
int64_t offset = *fetchOffset;
int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
while (1) {
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64,
vgId, offset, lastVer, committedVer, appliedVer);
while (offset <= appliedVer) {
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
", no more log to return, reqId:0x%" PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
*fetchOffset = offset;
code = -1;
goto END;
}
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId);
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) {
*fetchOffset = offset;
code = -1;
goto END;
}
*fetchOffset = offset;
code = 0;
if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader);
goto END;
} else {
if (pHandle->fetchMeta != WITH_DATA) {
SWalCont* pHead = &((*ppCkHead)->head);
SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
code = walFetchBody(pHandle->pWalReader, ppCkHead);
code = walFetchBody(pHandle->pWalReader);
if (code < 0) {
*fetchOffset = offset;
code = -1;
goto END;
}
pHead = &(pHandle->pWalReader->pHead->head);
if (isValValidForTable(pHandle, pHead)) {
*fetchOffset = offset;
code = 0;
goto END;
} else {
offset++;
code = -1;
continue;
}
}
}
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
code = walSkipFetchBody(pHandle->pWalReader);
if (code < 0) {
*fetchOffset = offset;
code = -1;
goto END;
}
offset++;
}
code = -1;
}
END:
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
*fetchOffset = offset;
return code;
}
......
......@@ -20,8 +20,9 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
pRsp->reqOffset = pReq->reqOffset;
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
......@@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
return 0;
}
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
pRsp->reqOffset = pReq->reqOffset;
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->withTbName = 1;
pRsp->withSchema = 1;
......@@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) {
uint64_t consumerId = pRequest->consumerId;
STqOffsetVal reqOffset = pRequest->reqOffset;
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
int32_t vgId = TD_VID(pTq->pVnode);
......@@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
} else {
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
if (pRequest->useSnapshot) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
consumerId, pHandle->subKey, vgId);
......@@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
} else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1);
tqInitDataRsp(&dataRsp, *pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
......@@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
*pBlockReturned = true;
return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
} else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, vgId, pRequest->subKey);
......@@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
if(offset->type == TMQ_OFFSET__LOG){
offset->version = ver + 1;
}
}
//static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
// if(offset->type == TMQ_OFFSET__LOG){
// offset->version = ver;
// }
//}
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
......@@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno = 0;
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset
tqInitDataRsp(&dataRsp, *pOffset);
// dataRsp.reqOffset.type = pOffset->type; // store origin type for getting offset in tmq_get_vgroup_offset
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
......@@ -152,8 +153,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// lock
taosWLockLatch(&pTq->lock);
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pOffset->version >= ver ||
dataRsp.rspOffset.version >= ver) { // check if there are data again to avoid lost data
if (dataRsp.rspOffset.version > ver) { // check if there are data again to avoid lost data
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
goto end;
......@@ -161,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock);
}
setRequestVersion(&dataRsp.reqOffset, pOffset->version);
// setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {
......@@ -179,11 +179,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SRpcMsg* pMsg, STqOffsetVal* offset) {
int code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SWalCkHead* pCkHead = NULL;
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
tqInitTaosxRsp(&taosxRsp, *offset);
// taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
......@@ -216,14 +215,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (offset->type == TMQ_OFFSET__LOG) {
walReaderVerifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto end;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
int totalRows = 0;
while (1) {
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
......@@ -234,14 +226,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
break;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
SWalCont* pHead = &pCkHead->head;
SWalCont* pHead = &pHandle->pWalReader->pHead->head;
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
......@@ -249,7 +241,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
......@@ -279,7 +271,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
} else {
......@@ -291,20 +283,17 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
end:
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
}
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
int32_t code = -1;
STqOffsetVal offset = {0};
STqOffsetVal reqOffset = pRequest->reqOffset;
// 1. reset the offset if needed
if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
// handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false;
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) {
return code;
}
......@@ -313,20 +302,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
if (blockReturned) {
return 0;
}
} else if(reqOffset.type != 0){ // use the consumer specified offset
// the offset value can not be monotonious increase??
offset = reqOffset;
} else {
} else if(reqOffset.type == 0){ // use the consumer specified offset
uError("req offset type is 0");
return TSDB_CODE_TMQ_INVALID_MSG;
}
// this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
} else { // todo handle the case where re-balance occurs.
// for taosx
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
}
}
......
......@@ -628,6 +628,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_SEEK:
return tqProcessSeekReq(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
......
......@@ -2350,7 +2350,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "leastsquares",
.type = FUNCTION_TYPE_LEASTSQUARES,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC,
.translateFunc = translateLeastSQR,
.getEnvFunc = getLeastSQRFuncEnv,
.initFunc = leastSQRFunctionSetup,
......@@ -2916,8 +2916,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "tail",
.type = FUNCTION_TYPE_TAIL,
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateTail,
.getEnvFunc = getTailFuncEnv,
.initFunc = tailFunctionSetup,
......@@ -2928,8 +2927,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "unique",
.type = FUNCTION_TYPE_UNIQUE,
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateUnique,
.getEnvFunc = getUniqueFuncEnv,
.initFunc = uniqueFunctionSetup,
......
......@@ -16,10 +16,6 @@
#include "taoserror.h"
#include "walInt.h"
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
if (pReader == NULL) {
......@@ -70,38 +66,29 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t fetchVer = pReader->curVersion;
int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal);
// int64_t appliedVer = walGetAppliedVer(pReader->pWal);
// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
// }
// int64_t endVer = TMIN(appliedVer, committedVer);
int64_t endVer = committedVer;
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer);
if (fetchVer > endVer){
", applied index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
if (fetchVer > appliedVer){
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
while (fetchVer <= endVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) {
while (fetchVer <= appliedVer) {
if (walFetchHead(pReader, fetchVer) < 0) {
return -1;
}
int32_t type = pReader->pHead->head.msgType;
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
if (walFetchBodyNew(pReader) < 0) {
if (walFetchBody(pReader) < 0) {
return -1;
}
return 0;
} else {
if (walSkipFetchBodyNew(pReader) < 0) {
if (walSkipFetchBody(pReader) < 0) {
return -1;
}
......@@ -263,104 +250,8 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
return 0;
}
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen;
bool seeked = false;
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
if (pRead->curVersion != fetchVer) {
if (walReaderSeekVer(pRead, fetchVer) < 0) {
return -1;
}
seeked = true;
}
while (1) {
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
if(walReadSeekVerImpl(pRead, fetchVer) < 0){
return -1;
}
seeked = true;
continue;
} else {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
return -1;
}
}
// pRead->curInvalid = 0;
return 0;
}
static int32_t walFetchBodyNew(SWalReader *pReader) {
SWalCont *pReadHead = &pReader->pHead->head;
int64_t ver = pReadHead->version;
wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver,
pReadHead->bodyLen);
if (pReader->capacity < pReadHead->bodyLen) {
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReader->pHead = ptr;
pReadHead = &pReader->pHead->head;
pReader->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pReader->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver, tstrerror(terrno));
} else {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
return -1;
}
if (walValidBodyCksum(pReader->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
wDebug("vgId:%d, index:%" PRId64 " is fetched, type:%d, cursor advance", pReader->pWal->cfg.vgId, ver, pReader->pHead->head.msgType);
pReader->curVersion = ver + 1;
return 0;
}
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int64_t code;
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
// pRead->curInvalid = 1;
return -1;
}
pRead->curVersion++;
wDebug("vgId:%d, version advance to %" PRId64 ", skip fetch", pRead->pWal->cfg.vgId, pRead->curVersion);
return 0;
}
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
int64_t code;
int64_t contLen;
bool seeked = false;
......@@ -378,15 +269,13 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
if (pRead->curVersion != ver) {
code = walReaderSeekVer(pRead, ver);
if (code < 0) {
// pRead->curVersion = ver;
// pRead->curInvalid = 1;
return -1;
}
seeked = true;
}
while (1) {
contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
......@@ -401,12 +290,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
// pRead->curInvalid = 1;
return -1;
}
}
code = walValidHeadCksum(pHead);
code = walValidHeadCksum(pRead->pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
......@@ -414,32 +302,27 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return -1;
}
// pRead->curInvalid = 0;
return 0;
}
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code;
int32_t walSkipFetchBody(SWalReader *pRead) {
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
// pRead->curInvalid = 1;
return -1;
}
pRead->curVersion++;
return 0;
}
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head);
int32_t walFetchBody(SWalReader *pRead) {
SWalCont *pReadHead = &pRead->pHead->head;
int64_t ver = pReadHead->version;
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
......@@ -448,13 +331,13 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead->pWal->vers.appliedVer);
if (pRead->capacity < pReadHead->bodyLen) {
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
*ppHead = ptr;
pReadHead = &((*ppHead)->head);
pRead->pHead = ptr;
pReadHead = &pRead->pHead->head;
pRead->capacity = pReadHead->bodyLen;
}
......@@ -468,27 +351,24 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead->pWal->cfg.vgId, pReadHead->version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
// pRead->curInvalid = 1;
return -1;
}
if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pReadHead->version, ver);
// pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (walValidBodyCksum(*ppHead) != 0) {
if (walValidBodyCksum(pRead->pHead) != 0) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver);
// pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
pRead->curVersion = ver + 1;
pRead->curVersion++;
return 0;
}
......
......@@ -127,6 +127,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
......@@ -455,7 +456,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
#,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
......
......@@ -219,15 +219,20 @@ class TDTestCase:
tdSql.checkRows(1)
res = tdSql.getData(0, 0)
if res is None:
tdLog.exit("result is not correct")
tdLog.exit("result is not correct")
def __test_current(self):
def __test_current(self, dbname=DBNAME):
# tdSql.query("explain select c1 from {dbname}.ct1")
# tdSql.query("explain select 1 from {dbname}.ct2")
# tdSql.query("explain select cast(ceil(c6) as bigint) from {dbname}.ct4 group by c6")
# tdSql.query("explain select count(c3) from {dbname}.ct4 group by c7 having count(c3) > 0")
# tdSql.query("explain select ct2.c3 from {dbname}.ct4 join ct2 on ct4.ts=ct2.ts")
# tdSql.query("explain select c1 from stb1 where c1 is not null and c1 in (0, 1, 2) or c1 between 2 and 100 ")
#
tdSql.query(f"select leastsquares(c1, 1, 1) from (select c1 from {dbname}.nt1 group by c1)")
tdSql.query(f"select leastsquares(c1, 1, 1) from (select c1 from {dbname}.nt1 partition by c1)")
tdSql.query(f"select leastsquares(c1, 1, 1) from (select c1 from {dbname}.nt1 order by c1)")
tdSql.query(f"select leastsquares(c1, 1, 1) from (select c1 from {dbname}.nt1 union select c1 from {dbname}.nt1)")
self.leastsquares_check()
......
......@@ -412,6 +412,18 @@ class TDTestCase:
tdSql.checkData(0,0,4)
tdSql.checkData(1,0,1)
tdSql.query(f"select tail(a, 1) from (select _rowts, first(c2) as a from {dbname}.ct1 group by c2);")
tdSql.checkRows(1)
tdSql.query(f"select tail(a, 1) from (select _rowts, first(c2) as a from {dbname}.ct1 partition by c2);")
tdSql.checkRows(1)
tdSql.query(f"select tail(a, 1) from (select _rowts, first(c2) as a from {dbname}.ct1 order by c2);")
tdSql.checkRows(1)
tdSql.query(f"select tail(a, 1) from (select _rowts, first(c2) as a from {dbname}.ct1 union select _rowts, first(c2) as a from {dbname}.ct1);")
tdSql.checkRows(1)
def check_boundary_values(self, dbname="bound_test"):
tdSql.execute(f"drop database if exists {dbname}")
......
......@@ -438,6 +438,15 @@ class TDTestCase:
tdSql.checkData(0,0,4)
tdSql.checkData(1,0,1)
tdSql.query(f"select unique(c1) v from (select _rowts, c1 from {dbname}.ct1 partition by c2)")
tdSql.checkRows(10)
tdSql.query(f"select unique(c1) v from (select _rowts, c1 from {dbname}.ct1 order by c2)")
tdSql.checkRows(10)
tdSql.query(f"select unique(c1) v from (select _rowts, c1 from {dbname}.ct1 union all select _rowts, c1 from {dbname}.ct1)")
tdSql.checkRows(10)
# TD-19911
tdSql.error("select unique(mode(12)) from (select _rowts , t1 , tbname from db.stb1 );")
tdSql.error("select unique(mode(t1,1)) from (select _rowts , t1 , tbname from db.stb1 );")
......
......@@ -36,7 +36,7 @@ class TDTestCase:
# tdDnodes[1].cfgDir
cfgFile = f"%s/taos.cfg"%(cfgDir)
shellCmd = 'echo "tmqMaxTopicNum %d" >> %s'%(tmqMaxTopicNum, cfgFile)
shellCmd = 'echo tmqMaxTopicNum %d >> %s'%(tmqMaxTopicNum, cfgFile)
tdLog.info(" shell cmd: %s"%(shellCmd))
os.system(shellCmd)
tdDnodes.stoptaosd(1)
......
......@@ -131,7 +131,7 @@ class TDTestCase:
if snapshot_value == "true":
if offset_value != "earliest" and offset_value != "":
if offset_value == "latest":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info))
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
......@@ -154,7 +154,7 @@ class TDTestCase:
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info))
else:
if offset_value != "none":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info))
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
......
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
buildPath = tdCom.getBuildPath()
cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath)
tdLog.info(cmdStr1)
os.system(cmdStr1)
time.sleep(15)
cmdStr2 = '%s/build/bin/tmq_offset_test &'%(buildPath)
tdLog.info(cmdStr2)
os.system(cmdStr2)
time.sleep(20)
os.system("kill -9 `pgrep taosBenchmark`")
result = os.system("kill -9 `pgrep tmq_offset_test`")
if result != 0:
tdLog.exit("tmq_offset_test error!")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c)
add_executable(sml_test sml_test.c)
add_executable(get_db_name_test get_db_name_test.c)
add_executable(tmq_offset tmqOffset.c)
add_executable(tmq_offset_test tmq_offset_test.c)
target_link_libraries(
tmq_offset
PUBLIC taos
......@@ -42,6 +43,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_offset_test
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
write_raw_block_test
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
int buildData(TAOS* pConn){
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists tp");
if (taos_errno(pRes) != 0) {
printf("error in drop tp, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists db_ts3756");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_ts3756 vgroups 2 wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use db_ts3756");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)");
if (taos_errno(pRes) != 0) {
printf("failed to create table meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into t1 values(now, 1)");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into t1 values(now + 1s, 2)");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic tp as select * from t1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic tp, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
void test_offset(TAOS* pConn){
if(buildData(pConn) != 0){
ASSERT(0);
}
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "false");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "tp");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
int32_t timeout = 200;
tmq_topic_assignment* pAssign1 = NULL;
int32_t numOfAssign1 = 0;
tmq_topic_assignment* pAssign2 = NULL;
int32_t numOfAssign2 = 0;
tmq_topic_assignment* pAssign3 = NULL;
int32_t numOfAssign3 = 0;
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign1, &numOfAssign1);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign1);
tmq_consumer_close(tmq);
ASSERT(0);
}
code = tmq_get_topic_assignment(tmq, "tp", &pAssign2, &numOfAssign2);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign2);
tmq_consumer_close(tmq);
ASSERT(0);
}
code = tmq_get_topic_assignment(tmq, "tp", &pAssign3, &numOfAssign3);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign3);
tmq_consumer_close(tmq);
ASSERT(0);
return;
}
ASSERT(numOfAssign1 == 2);
ASSERT(numOfAssign1 == numOfAssign2);
ASSERT(numOfAssign1 == numOfAssign3);
for(int i = 0; i < numOfAssign1; i++){
int j = 0;
int k = 0;
for(; j < numOfAssign2; j++){
if(pAssign1[i].vgId == pAssign2[j].vgId){
break;
}
}
for(; k < numOfAssign3; k++){
if(pAssign1[i].vgId == pAssign3[k].vgId){
break;
}
}
ASSERT(pAssign1[i].currentOffset == pAssign2[j].currentOffset);
ASSERT(pAssign1[i].currentOffset == pAssign3[k].currentOffset);
ASSERT(pAssign1[i].begin == pAssign2[j].begin);
ASSERT(pAssign1[i].begin == pAssign3[k].begin);
ASSERT(pAssign1[i].end == pAssign2[j].end);
ASSERT(pAssign1[i].end == pAssign3[k].end);
}
tmq_free_assignment(pAssign1);
tmq_free_assignment(pAssign2);
tmq_free_assignment(pAssign3);
int cnt = 0;
int offset1 = -1;
int offset2 = -1;
while (cnt++ < 10) {
printf("start to poll:%d\n", cnt);
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
ASSERT(0);
}
for(int i = 0; i < numOfAssign; i++){
int64_t position = tmq_position(tmq, "tp", pAssign[i].vgId);
if(position == 0) continue;
printf("position = %d\n", (int)position);
tmq_commit_offset_sync(tmq, "tp", pAssign[i].vgId, position);
int64_t committed = tmq_committed(tmq, "tp", pAssign[i].vgId);
ASSERT(position == committed);
}
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
if(offset1 != -1){
ASSERT(offset1 == pAssign[0].currentOffset);
}
if(offset2 != -1){
ASSERT(offset2 == pAssign[1].currentOffset);
}
offset1 = pAssign[0].currentOffset;
offset2 = pAssign[1].currentOffset;
tmq_free_assignment(pAssign);
taos_free_result(pRes);
}
}
tmq_consumer_close(tmq);
}
// run taosBenchmark first
void test_ts3756(TAOS* pConn){
TAOS_RES*pRes = taos_query(pConn, "use test");
if (taos_errno(pRes) != 0) {
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists t1");
if (taos_errno(pRes) != 0) {
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic t1 as select * from meters");
if (taos_errno(pRes) != 0) {
ASSERT(0);
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "latest");
tmq_conf_set(conf, "msg.with.table.name", "false");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
int32_t timeout = 200;
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
while (1) {
// printf("start to poll\n");
pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
tmq_topic_assignment* pAssignTmp = NULL;
int32_t numOfAssignTmp = 0;
int32_t code = tmq_get_topic_assignment(tmq, "t1", &pAssignTmp, &numOfAssignTmp);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
ASSERT(0);
}
if(numOfAssign != 0){
int i = 0;
for(; i < numOfAssign; i++){
if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){
break;
}
}
if(i == numOfAssign){
ASSERT(0);
}
tmq_free_assignment(pAssign);
}
numOfAssign = numOfAssignTmp;
pAssign = pAssignTmp;
taos_free_result(pRes);
}
}
tmq_free_assignment(pAssign);
}
int main(int argc, char* argv[]) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
test_offset(pConn);
test_ts3756(pConn);
taos_close(pConn);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册