提交 9378a0ef 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'main' of https://github.com/taosdata/TDengine into mark/tmq

......@@ -298,7 +298,7 @@ You configure the following parameters when creating a consumer:
| `td.connect.port` | string | Port of the server side | |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups. |
| `client.id` | string | Client ID | Maximum length: 192. |
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior; `latest`: subscribe from the latest data; or `none`: can't subscribe without committed offset|
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false
......
......@@ -630,6 +630,11 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
goto _OVER;
}
if (strlen(createReq.pass) >= TSDB_PASSWORD_LEN){
terrno = TSDB_CODE_PAR_NAME_OR_PASSWD_TOO_LONG;
goto _OVER;
}
pUser = mndAcquireUser(pMnode, createReq.user);
if (pUser != NULL) {
terrno = TSDB_CODE_MND_USER_ALREADY_EXIST;
......
......@@ -73,7 +73,7 @@ typedef struct SCliConn {
SDelayTask* task;
char* ip;
char* dstAddr;
char src[32];
char dst[32];
......@@ -196,6 +196,7 @@ static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
// process data read from server, add decompress etc later
static void cliHandleResp(SCliConn* conn);
// handle except about conn
......@@ -543,6 +544,7 @@ void cliConnTimeout(uv_timer_t* handle) {
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, UV_ECANCELED);
}
void cliReadTimeoutCb(uv_timer_t* handle) {
......@@ -719,7 +721,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
cliDestroyConnMsgs(conn, false);
if (conn->list == NULL) {
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip) + 1);
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
}
SConnList* pList = conn->list;
......@@ -878,7 +880,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
connList->list->numOfConn--;
connList->size--;
} else {
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip) + 1);
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
if (connList != NULL) connList->list->numOfConn--;
}
conn->list = NULL;
......@@ -923,7 +925,7 @@ static void cliDestroy(uv_handle_t* handle) {
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip);
taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn->stream);
cliDestroyConnMsgs(conn, true);
......@@ -1168,7 +1170,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
if (conn == NULL) {
conn = cliCreateConn(pThrd);
conn->pBatch = pBatch;
conn->ip = taosStrdup(pList->dst);
conn->dstAddr = taosStrdup(pList->dst);
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
if (ipaddr == 0xffffffff) {
......@@ -1213,6 +1215,8 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, -1);
return;
}
......@@ -1271,11 +1275,11 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
STraceId* trace = &pMsg->msg.info.traceId;
tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status));
TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status));
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1);
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1);
int64_t cTimestamp = taosGetTimestampMs();
if (item != NULL) {
int32_t elapse = cTimestamp - item->timestamp;
......@@ -1287,12 +1291,12 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
}
} else {
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1, &item, sizeof(SFailFastItem));
taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1, &item, sizeof(SFailFastItem));
}
}
} else {
tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
pConn, pConn->ip, uv_strerror(status));
pConn, pConn->dstAddr, uv_strerror(status));
cliDestroyBatch(pConn->pBatch);
pConn->pBatch = NULL;
}
......@@ -1314,6 +1318,7 @@ void cliConnCb(uv_connect_t* req, int status) {
}
if (status != 0) {
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr);
if (timeout == false) {
cliHandleFastFail(pConn, status);
} else if (timeout == true) {
......@@ -1483,9 +1488,34 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
}
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
// impl later
uint32_t addr = taosGetIpv4FromFqdn(fqdn);
if (addr != 0xffffffff) {
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
if (addr != *v) {
char old[64] = {0}, new[64] = {0};
tinet_ntoa(old, *v);
tinet_ntoa(new, addr);
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
}
}
return;
}
static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) {
if (dst == NULL) return;
int16_t i = 0, len = strlen(dst);
for (i = len - 1; i >= 0; i--) {
if (dst[i] == ':') break;
}
if (i > 0) {
char fqdn[TSDB_FQDN_LEN + 1] = {0};
memcpy(fqdn, dst, i);
cliUpdateFqdnCache(cache, fqdn);
}
}
static void doFreeTimeoutMsg(void* param) {
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
......@@ -1560,7 +1590,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
conn->ip = taosStrdup(addr);
conn->dstAddr = taosStrdup(addr);
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
if (ipaddr == 0xffffffff) {
......@@ -1578,7 +1608,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
addr.sin_addr.s_addr = ipaddr;
addr.sin_port = (uint16_t)htons(port);
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr);
pThrd->newConnCount++;
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
if (fd == -1) {
......@@ -1608,6 +1638,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, ret);
return;
}
......
......@@ -33,7 +33,7 @@ class TDTestCase:
self.colname_length_boundary = self.boundary.COL_KEY_MAX_LENGTH
self.tagname_length_boundary = self.boundary.TAG_KEY_MAX_LENGTH
self.username_length_boundary = 23
self.password_length_boundary = 128
self.password_length_boundary = 31
def dbname_length_check(self):
dbname_length = randint(1,self.dbname_length_boundary-1)
for dbname in [tdCom.get_long_name(self.dbname_length_boundary),tdCom.get_long_name(dbname_length)]:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册