提交 f2297ddf 编写于 作者: J jiajingbin

Merge branch 'main' of https://github.com/taosdata/TDengine into main

...@@ -16,7 +16,7 @@ This statement creates a user account. ...@@ -16,7 +16,7 @@ This statement creates a user account.
The maximum length of user_name is 23 bytes. The maximum length of user_name is 23 bytes.
The maximum length of password is 128 bytes. The password can include leters, digits, and special characters excluding single quotation marks, double quotation marks, backticks, backslashes, and spaces. The password cannot be empty. The maximum length of password is 32 bytes. The password can include leters, digits, and special characters excluding single quotation marks, double quotation marks, backticks, backslashes, and spaces. The password cannot be empty.
`SYSINFO` indicates whether the user is allowed to view system information. `1` means allowed, `0` means not allowed. System information includes server configuration, dnode, vnode, storage. The default value is `1`. `SYSINFO` indicates whether the user is allowed to view system information. `1` means allowed, `0` means not allowed. System information includes server configuration, dnode, vnode, storage. The default value is `1`.
......
...@@ -16,7 +16,7 @@ CREATE USER use_name PASS 'password' [SYSINFO {1|0}]; ...@@ -16,7 +16,7 @@ CREATE USER use_name PASS 'password' [SYSINFO {1|0}];
use_name 最长为 23 字节。 use_name 最长为 23 字节。
password 最长为 128 字节,合法字符包括"a-zA-Z0-9!?$%^&*()_–+={[}]:;@~#|<,>.?/",不可以出现单双引号、撇号、反斜杠和空格,且不可以为空。 password 最长为 32 字节,合法字符包括"a-zA-Z0-9!?$%^&*()_–+={[}]:;@~#|<,>.?/",不可以出现单双引号、撇号、反斜杠和空格,且不可以为空。
SYSINFO 表示用户是否可以查看系统信息。1 表示可以查看,0 表示不可以查看。系统信息包括服务端配置信息、服务端各种节点信息(如 DNODE、QNODE等)、存储相关的信息等。默认为可以查看系统信息。 SYSINFO 表示用户是否可以查看系统信息。1 表示可以查看,0 表示不可以查看。系统信息包括服务端配置信息、服务端各种节点信息(如 DNODE、QNODE等)、存储相关的信息等。默认为可以查看系统信息。
......
...@@ -678,6 +678,8 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm ...@@ -678,6 +678,8 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam); pCommitFp(tmq, code, userParam);
} }
// update the offset value.
pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
} else { // do not perform commit, callback user function directly. } else { // do not perform commit, callback user function directly.
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam); pCommitFp(tmq, code, userParam);
...@@ -1424,7 +1426,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1424,7 +1426,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
char buf[TSDB_OFFSET_LEN]; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset);
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId); tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
...@@ -1570,7 +1572,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) ...@@ -1570,7 +1572,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
char buf[TSDB_OFFSET_LEN]; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset);
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf); vgKey, buf);
...@@ -1798,7 +1800,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p ...@@ -1798,7 +1800,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo->msgType = TDMT_VND_TMQ_CONSUME; sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
int64_t transporterId = 0; int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN]; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
...@@ -1944,7 +1946,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1944,7 +1946,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
char buf[TSDB_OFFSET_LEN]; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
if (pDataRsp->blockNum == 0) { if (pDataRsp->blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
...@@ -2041,7 +2043,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -2041,7 +2043,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq->totalRows += numOfRows; tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN]; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
...@@ -2675,7 +2677,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a ...@@ -2675,7 +2677,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
int64_t transporterId = 0; int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN]; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
...@@ -2712,7 +2714,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a ...@@ -2712,7 +2714,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
// char offsetBuf[TSDB_OFFSET_LEN] = {0}; // char offsetBuf[TSDB_OFFSET_LEN] = {0};
// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); // tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscInfo("vgId:%d offset is old to:%"PRId64, p->vgId, p->currentOffset); tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end; pOffsetInfo->walVerEnd = p->end;
...@@ -2792,11 +2794,12 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ ...@@ -2792,11 +2794,12 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
} }
// update the offset, and then commit to vnode // update the offset, and then commit to vnode
if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { // if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
pOffsetInfo->currentOffset.version = offset; pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
pOffsetInfo->committedOffset.version = INT64_MIN; pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
pVg->seekUpdated = true; pOffsetInfo->committedOffset.version = INT64_MIN;
} pVg->seekUpdated = true;
// }
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
......
...@@ -1073,6 +1073,146 @@ TEST(clientCase, sub_db_test) { ...@@ -1073,6 +1073,146 @@ TEST(clientCase, sub_db_test) {
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
} }
TEST(clientCase, td_25129) {
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "tp");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
TAOS_FIELD* fields = NULL;
int32_t numOfFields = 0;
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 2000;
int32_t count = 0;
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[128];
const char* topicName = tmq_get_topic_name(pRes);
// const char* dbName = tmq_get_db_name(pRes);
// int32_t vgroupId = tmq_get_vgroup_id(pRes);
//
// printf("topic: %s\n", topicName);
// printf("db: %s\n", dbName);
// printf("vgroup id: %d\n", vgroupId);
printSubResults(pRes, &totalRows);
} else {
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
continue;
}
// tmq_commit_sync(tmq, pRes);
if (pRes != NULL) {
taos_free_result(pRes);
// if ((++count) > 1) {
// break;
// }
} else {
break;
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
}
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
TEST(clientCase, sub_tb_test) { TEST(clientCase, sub_tb_test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
......
...@@ -19,6 +19,9 @@ ...@@ -19,6 +19,9 @@
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h" #include "tglobal.h"
#include "version.h" #include "version.h"
#ifdef TD_JEMALLOC_ENABLED
#include "jemalloc/jemalloc.h"
#endif
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
#include "cus_name.h" #include "cus_name.h"
...@@ -255,6 +258,10 @@ static void taosCleanupArgs() { ...@@ -255,6 +258,10 @@ static void taosCleanupArgs() {
} }
int main(int argc, char const *argv[]) { int main(int argc, char const *argv[]) {
#ifdef TD_JEMALLOC_ENABLED
bool jeBackgroundThread = true;
mallctl("background_thread", NULL, NULL, &jeBackgroundThread, sizeof(bool));
#endif
if (!taosCheckSystemIsLittleEnd()) { if (!taosCheckSystemIsLittleEnd()) {
printf("failed to start since on non-little-end machines\n"); printf("failed to start since on non-little-end machines\n");
return -1; return -1;
......
...@@ -151,7 +151,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -151,7 +151,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId); int32_t type, int32_t vgId);
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId); //int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
// tqMeta // tqMeta
int32_t tqMetaOpen(STQ* pTq); int32_t tqMetaOpen(STQ* pTq);
......
...@@ -232,26 +232,43 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData ...@@ -232,26 +232,43 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
return 0; return 0;
} }
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqDataRsp dataRsp = {0}; SMqPollReq req = {0};
dataRsp.head.consumerId = pHandle->consumerId; if (tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req) < 0) {
dataRsp.head.epoch = pHandle->epoch; tqError("tDeserializeSMqPollReq %d failed", pHandle->msg->contLen);
dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; terrno = TSDB_CODE_INVALID_MSG;
return -1;
int64_t sver = 0, ever = 0; }
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
ever);
char buf1[TSDB_OFFSET_LEN] = {0}; SMqDataRsp dataRsp = {0};
char buf2[TSDB_OFFSET_LEN] = {0}; tqInitDataRsp(&dataRsp, &req);
tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); dataRsp.blockNum = 0;
tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); dataRsp.rspOffset = dataRsp.reqOffset;
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); tDeleteMqDataRsp(&dataRsp);
return 0; return 0;
} }
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
// SMqDataRsp dataRsp = {0};
// dataRsp.head.consumerId = pHandle->consumerId;
// dataRsp.head.epoch = pHandle->epoch;
// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
//
// int64_t sver = 0, ever = 0;
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
// ever);
//
// char buf1[TSDB_OFFSET_LEN] = {0};
// char buf2[TSDB_OFFSET_LEN] = {0};
// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
// return 0;
//}
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId) { int32_t type, int32_t vgId) {
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
...@@ -490,7 +507,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -490,7 +507,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (!exec) { if (!exec) {
tqSetHandleExec(pHandle); tqSetHandleExec(pHandle);
// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); // qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
req.subKey, pHandle); req.subKey, pHandle);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
break; break;
...@@ -510,7 +527,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -510,7 +527,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pHandle->epoch = reqEpoch; pHandle->epoch = reqEpoch;
} }
char buf[TSDB_OFFSET_LEN]; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
...@@ -518,7 +535,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -518,7 +535,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
tqSetHandleIdle(pHandle); tqSetHandleIdle(pHandle);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
req.subKey, pHandle); req.subKey, pHandle);
return code; return code;
} }
...@@ -571,10 +588,26 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -571,10 +588,26 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
if (reqOffset.type == TMQ_OFFSET__LOG) { if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = reqOffset.version; dataRsp.rspOffset.version = reqOffset.version;
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { } else if(reqOffset.type < 0){
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { if (pOffset != NULL) {
dataRsp.rspOffset.version = ever; if (pOffset->val.type != TMQ_OFFSET__LOG) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
dataRsp.rspOffset.version = pOffset->val.version;
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
}else{
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.rspOffset.version = ever;
}
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
}
} else { } else {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
reqOffset.type); reqOffset.type);
...@@ -582,7 +615,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -582,7 +615,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
return -1; return -1;
} }
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
......
...@@ -64,7 +64,9 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { ...@@ -64,7 +64,9 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
} else { } else {
tqPushDataRsp(pHandle, vgId); // tqPushDataRsp(pHandle, vgId);
tqPushEmptyDataRsp(pHandle, vgId);
void* tmp = pHandle->msg->pCont; void* tmp = pHandle->msg->pCont;
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = tmp; pHandle->msg->pCont = tmp;
...@@ -89,7 +91,8 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { ...@@ -89,7 +91,8 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) { if(pHandle->msg != NULL) {
tqPushDataRsp(pHandle, vgId); // tqPushDataRsp(pHandle, vgId);
tqPushEmptyDataRsp(pHandle, vgId);
rpcFreeCont(pHandle->msg->pCont); rpcFreeCont(pHandle->msg->pCont);
taosMemoryFree(pHandle->msg); taosMemoryFree(pHandle->msg);
......
...@@ -99,7 +99,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -99,7 +99,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
if (pOffset != NULL) { if (pOffset != NULL) {
*pOffsetVal = pOffset->val; *pOffsetVal = pOffset->val;
char formatBuf[TSDB_OFFSET_LEN]; char formatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal); tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 tqDebug("tmq poll: consumer:0x%" PRIx64
", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64, ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64,
...@@ -162,6 +162,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -162,6 +162,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
uint64_t consumerId = pRequest->consumerId; uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int code = 0; int code = 0;
terrno = 0;
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest); tqInitDataRsp(&dataRsp, pRequest);
......
...@@ -127,7 +127,7 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem ...@@ -127,7 +127,7 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem
if(kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 || kv->type != pTagSchema->type){ if(kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 || kv->type != pTagSchema->type){
code = TSDB_CODE_SML_INVALID_DATA; code = TSDB_CODE_SML_INVALID_DATA;
uError("SML smlBuildCol error col not same %s", pTagSchema->name); uError("SML smlBuildTagRow error col not same %s", pTagSchema->name);
goto end; goto end;
} }
...@@ -210,7 +210,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32 ...@@ -210,7 +210,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
SSmlKv* kv = (SSmlKv*)data; SSmlKv* kv = (SSmlKv*)data;
if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){ if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){
ret = TSDB_CODE_SML_INVALID_DATA; ret = TSDB_CODE_SML_INVALID_DATA;
uError("SML smlBuildCol error col not same %s", pColSchema->name); uInfo("SML smlBuildCol error col not same %s", pColSchema->name);
goto end; goto end;
} }
if (kv->type == TSDB_DATA_TYPE_NCHAR) { if (kv->type == TSDB_DATA_TYPE_NCHAR) {
......
...@@ -403,6 +403,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -403,6 +403,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// wait for the task to be ready to go // wait for the task to be ready to go
while (pTask->taskLevel == TASK_LEVEL__SOURCE) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus); int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status == TASK_STATUS__DROPPING) {
break;
}
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status); qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status);
taosMsleep(100); taosMsleep(100);
......
...@@ -546,7 +546,7 @@ class TDTestCase: ...@@ -546,7 +546,7 @@ class TDTestCase:
keyList = 'group.id:cgrp1,\ keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\ enable.auto.commit:false,\
auto.commit.interval.ms:6000,\ auto.commit.interval.ms:6000,\
auto.offset.reset:none' auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt/2,topicList,keyList,ifcheckdata,ifManualCommit) self.insertConsumerInfo(consumerId, expectrowcnt/2,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor") tdLog.info("again start consume processor")
...@@ -569,7 +569,7 @@ class TDTestCase: ...@@ -569,7 +569,7 @@ class TDTestCase:
keyList = 'group.id:cgrp1,\ keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\ enable.auto.commit:false,\
auto.commit.interval.ms:6000,\ auto.commit.interval.ms:6000,\
auto.offset.reset:none' auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor") tdLog.info("again start consume processor")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册