提交 31040f29 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch 'develop' into feature/sangshuduo/TD-2598

...@@ -237,22 +237,22 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { ...@@ -237,22 +237,22 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
bool isReady = false; bool isReady = false;
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVnode = pVgroup->vnodeGid + i; SVnodeGid *pVnode = pVgroup->vnodeGid + i;
SDnodeObj *pDnode = pVnode->pDnode;
if (pVnode == pRmVnode) continue; if (pVnode == pRmVnode) continue;
int32_t vver = mnodeGetVgidVer(pVnode->vver); int32_t vver = mnodeGetVgidVer(pVnode->vver);
mTrace("vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d, rmvver:%d" , pVgroup->vgId, i, mTrace("vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d, rmvver:%d", pVgroup->vgId, i,
pVnode->dnodeId, dnodeStatus[pVnode->pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer); pVnode->dnodeId, dnodeStatus[pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer);
if (pVnode->pDnode->status == TAOS_DN_STATUS_DROPPING) continue; if (pDnode->status == TAOS_DN_STATUS_DROPPING) continue;
if (pVnode->pDnode->status == TAOS_DN_STATUS_OFFLINE) continue; if (pDnode->status == TAOS_DN_STATUS_OFFLINE) continue;
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) continue; if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) continue;
if (rmVnodeVer == 0 || vver >= rmVnodeVer) { if (rmVnodeVer == 0 || vver >= rmVnodeVer) {
mInfo("vgId:%d, is ready for vindex:%d in dnode:%d status:%s role:%s vver:%d larger than rmvver:%d", pVgroup->vgId, i, mInfo("vgId:%d, is ready for vindex:%d in dnode:%d status:%s role:%s vver:%d larger than rmvver:%d",
pVnode->dnodeId, dnodeStatus[pVnode->pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer); pVgroup->vgId, i, pVnode->dnodeId, dnodeStatus[pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer);
}
isReady = true; isReady = true;
} }
}
return isReady; return isReady;
} }
......
...@@ -184,7 +184,19 @@ void dnodeReprocessMWriteMsg(void *pMsg) { ...@@ -184,7 +184,19 @@ void dnodeReprocessMWriteMsg(void *pMsg) {
dDebug("msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d", pWrite, pWrite->rpcMsg.ahandle, dDebug("msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry); taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
dnodeSendRedirectMsg(pMsg, true); if (pWrite->pBatchMasterMsg) {
++pWrite->pBatchMasterMsg->received;
if (pWrite->pBatchMasterMsg->successed + pWrite->pBatchMasterMsg->received
>= pWrite->pBatchMasterMsg->expected) {
dnodeSendRedirectMsg(&pWrite->rpcMsg, true);
dnodeFreeMWriteMsg(pWrite);
}
mnodeDestroySubMsg(pWrite);
return;
}
dnodeSendRedirectMsg(&pWrite->rpcMsg, true);
dnodeFreeMWriteMsg(pWrite); dnodeFreeMWriteMsg(pWrite);
} else { } else {
dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle, dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
......
...@@ -73,7 +73,8 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { ...@@ -73,7 +73,8 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
if (*numOfVnodes >= TSDB_MAX_VNODES) { if (*numOfVnodes >= TSDB_MAX_VNODES) {
dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES); dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
continue; closedir(dir);
return TSDB_CODE_DND_TOO_MANY_VNODES;
} else { } else {
vnodeList[*numOfVnodes - 1] = vnode; vnodeList[*numOfVnodes - 1] = vnode;
} }
......
...@@ -42,11 +42,12 @@ typedef struct SMnodeMsg { ...@@ -42,11 +42,12 @@ typedef struct SMnodeMsg {
struct SVgObj * pVgroup; struct SVgObj * pVgroup;
struct STableObj *pTable; struct STableObj *pTable;
struct SSTableObj*pSTable; struct SSTableObj*pSTable;
struct SMnodeMsg *pBatchMasterMsg;
SMnodeRsp rpcRsp; SMnodeRsp rpcRsp;
int8_t received; int16_t received;
int8_t successed; int16_t successed;
int8_t expected; int16_t expected;
int8_t retry; int16_t retry;
int32_t incomingTs; int32_t incomingTs;
int32_t code; int32_t code;
void * pObj; void * pObj;
...@@ -57,6 +58,7 @@ typedef struct SMnodeMsg { ...@@ -57,6 +58,7 @@ typedef struct SMnodeMsg {
void * mnodeCreateMsg(SRpcMsg *pRpcMsg); void * mnodeCreateMsg(SRpcMsg *pRpcMsg);
int32_t mnodeInitMsg(SMnodeMsg *pMsg); int32_t mnodeInitMsg(SMnodeMsg *pMsg);
void mnodeCleanupMsg(SMnodeMsg *pMsg); void mnodeCleanupMsg(SMnodeMsg *pMsg);
void mnodeDestroySubMsg(SMnodeMsg *pSubMsg);
int32_t mnodeInitSystem(); int32_t mnodeInitSystem();
int32_t mnodeStartSystem(); int32_t mnodeStartSystem();
......
...@@ -194,6 +194,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "Dnode out ...@@ -194,6 +194,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "Dnode out
TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, 0, 0x0402, "No permission for disk files in dnode") TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, 0, 0x0402, "No permission for disk files in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, 0, 0x0403, "Invalid message length") TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, 0, 0x0403, "Invalid message length")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, 0, 0x0404, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, 0, 0x0404, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_TOO_MANY_VNODES, 0, 0x0405, "Too many vnode directories")
// vnode // vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, 0, 0x0500, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, 0, 0x0500, "Action in progress")
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "tcompare.h" #include "tcompare.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "tgrant.h" #include "tgrant.h"
#include "tqueue.h"
#include "hash.h" #include "hash.h"
#include "mnode.h" #include "mnode.h"
#include "dnode.h" #include "dnode.h"
...@@ -720,6 +721,133 @@ static void mnodeExtractTableName(char* tableId, char* name) { ...@@ -720,6 +721,133 @@ static void mnodeExtractTableName(char* tableId, char* name) {
} }
} }
static SMnodeMsg *mnodeCreateSubMsg(SMnodeMsg *pBatchMasterMsg, int32_t contSize) {
SMnodeMsg *pSubMsg = taosAllocateQitem(sizeof(*pBatchMasterMsg) + contSize);
*pSubMsg = *pBatchMasterMsg;
//pSubMsg->pCont = (char *) pSubMsg + sizeof(SMnodeMsg);
pSubMsg->rpcMsg.pCont = pSubMsg->pCont;
pSubMsg->successed = 0;
pSubMsg->expected = 0;
SCMCreateTableMsg *pCM = pSubMsg->rpcMsg.pCont;
pCM->numOfTables = htonl(1);
pCM->contLen = htonl(contSize);
return pSubMsg;
}
void mnodeDestroySubMsg(SMnodeMsg *pSubMsg) {
if (pSubMsg) {
// pUser is retained in batch master msg
if (pSubMsg->pDb) mnodeDecDbRef(pSubMsg->pDb);
if (pSubMsg->pVgroup) mnodeDecVgroupRef(pSubMsg->pVgroup);
if (pSubMsg->pTable) mnodeDecTableRef(pSubMsg->pTable);
if (pSubMsg->pSTable) mnodeDecTableRef(pSubMsg->pSTable);
if (pSubMsg->pAcct) mnodeDecAcctRef(pSubMsg->pAcct);
if (pSubMsg->pDnode) mnodeDecDnodeRef(pSubMsg->pDnode);
taosFreeQitem(pSubMsg);
}
}
static int32_t mnodeValidateCreateTableMsg(SCreateTableMsg *pCreateTable, SMnodeMsg *pMsg) {
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCreateTable->db);
if (pMsg->pDb == NULL) {
mError("msg:%p, app:%p table:%s, failed to create, db not selected", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableId);
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreateTable->tableId);
if (pMsg->pTable != NULL && pMsg->retry == 0) {
if (pCreateTable->getMeta) {
mDebug("msg:%p, app:%p table:%s, continue to get meta", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableId);
return mnodeGetChildTableMeta(pMsg);
} else if (pCreateTable->igExists) {
mDebug("msg:%p, app:%p table:%s, is already exist", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableId);
return TSDB_CODE_SUCCESS;
} else {
mError("msg:%p, app:%p table:%s, failed to create, table already exist", pMsg, pMsg->rpcMsg.ahandle,
pCreateTable->tableId);
return TSDB_CODE_MND_TABLE_ALREADY_EXIST;
}
}
if (pCreateTable->numOfTags != 0) {
mDebug("msg:%p, app:%p table:%s, create stable msg is received from thandle:%p", pMsg, pMsg->rpcMsg.ahandle,
pCreateTable->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateSuperTableMsg(pMsg);
} else {
mDebug("msg:%p, app:%p table:%s, create ctable msg is received from thandle:%p", pMsg, pMsg->rpcMsg.ahandle,
pCreateTable->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateChildTableMsg(pMsg);
}
}
static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pBatchMasterMsg == NULL) { // batch master first round
pMsg->pBatchMasterMsg = pMsg;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
int32_t numOfTables = htonl(pCreate->numOfTables);
int32_t contentLen = htonl(pCreate->contLen);
pMsg->expected = numOfTables;
int32_t code = TSDB_CODE_SUCCESS;
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg));
for (SCreateTableMsg *p = pCreateTable; p < (SCreateTableMsg *) ((char *) pCreate + contentLen); p = (SCreateTableMsg *) ((char *) p + htonl(p->len))) {
SMnodeMsg *pSubMsg = mnodeCreateSubMsg(pMsg, sizeof(SCMCreateTableMsg) + htonl(p->len));
memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len));
code = mnodeValidateCreateTableMsg(p, pSubMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
++pSubMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pSubMsg);
continue;
}
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySubMsg(pSubMsg);
return code;
}
}
if (pMsg->successed >= pMsg->expected) {
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
} else {
if (pMsg->pBatchMasterMsg != pMsg) { // batch sub replay
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg));
int32_t code = mnodeValidateCreateTableMsg(pCreateTable, pMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
++pMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pMsg);
}
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySubMsg(pMsg);
return code;
}
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
} else { // batch master replay, reprocess the whole batch
assert(0);
}
}
}
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
...@@ -729,6 +857,11 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -729,6 +857,11 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
// todo return error // todo return error
} }
// batch master msg first round or reprocessing and batch sub msg reprocessing
if (numOfTables > 1 || pMsg->pBatchMasterMsg != NULL) {
return mnodeProcessBatchCreateTableMsg(pMsg);
}
SCreateTableMsg *p = (SCreateTableMsg*)((char*) pCreate + sizeof(SCMCreateTableMsg)); SCreateTableMsg *p = (SCreateTableMsg*)((char*) pCreate + sizeof(SCMCreateTableMsg));
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(p->db); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(p->db);
if (pMsg->pDb == NULL) { if (pMsg->pDb == NULL) {
...@@ -1737,6 +1870,18 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1737,6 +1870,18 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
mDebug("msg:%p, app:%p table:%s, created in dnode, thandle:%p", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, mDebug("msg:%p, app:%p table:%s, created in dnode, thandle:%p", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
pMsg->rpcMsg.handle); pMsg->rpcMsg.handle);
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->successed;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
}
mnodeDestroySubMsg(pMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS); dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS);
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
...@@ -2477,6 +2622,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -2477,6 +2622,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
mnodeSendDropChildTableMsg(pMsg, false); mnodeSendDropChildTableMsg(pMsg, false);
rpcMsg->code = TSDB_CODE_SUCCESS; rpcMsg->code = TSDB_CODE_SUCCESS;
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->successed;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code);
}
mnodeDestroySubMsg(pMsg);
return;
}
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code); dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
return; return;
} }
...@@ -2494,6 +2652,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -2494,6 +2652,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
pMsg->pTable = NULL; pMsg->pTable = NULL;
mnodeDestroyChildTable(pTable); mnodeDestroyChildTable(pTable);
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->received;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
}
mnodeDestroySubMsg(pMsg);
return;
}
dnodeSendRpcMWriteRsp(pMsg, code); dnodeSendRpcMWriteRsp(pMsg, code);
} }
} else { } else {
...@@ -2519,6 +2690,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -2519,6 +2690,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
//Avoid retry again in client //Avoid retry again in client
rpcMsg->code = TSDB_CODE_MND_VGROUP_NOT_READY; rpcMsg->code = TSDB_CODE_MND_VGROUP_NOT_READY;
} }
if (pMsg->pBatchMasterMsg) {
++pMsg->pBatchMasterMsg->received;
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) {
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code);
}
mnodeDestroySubMsg(pMsg);
return;
}
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code); dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
} }
} }
......
...@@ -50,9 +50,16 @@ static void httpSendErrorRespImp(HttpContext *pContext, int32_t httpCode, char * ...@@ -50,9 +50,16 @@ static void httpSendErrorRespImp(HttpContext *pContext, int32_t httpCode, char *
char head[512] = {0}; char head[512] = {0};
char body[512] = {0}; char body[512] = {0};
int8_t httpVersion = 0;
int8_t keepAlive = 0;
if (pContext->parser != NULL) {
httpVersion = pContext->parser->httpVersion;
keepAlive = pContext->parser->keepAlive;
}
int32_t bodyLen = sprintf(body, httpRespTemplate[HTTP_RESPONSE_JSON_ERROR], errNo, desc); int32_t bodyLen = sprintf(body, httpRespTemplate[HTTP_RESPONSE_JSON_ERROR], errNo, desc);
int32_t headLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_ERROR], httpVersionStr[pContext->parser->httpVersion], int32_t headLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_ERROR], httpVersionStr[httpVersion], httpCode,
httpCode, httpCodeStr, httpKeepAliveStr[pContext->parser->keepAlive], bodyLen); httpCodeStr, httpKeepAliveStr[keepAlive], bodyLen);
httpWriteBuf(pContext, head, headLen); httpWriteBuf(pContext, head, headLen);
httpWriteBuf(pContext, body, bodyLen); httpWriteBuf(pContext, body, bodyLen);
...@@ -164,9 +171,16 @@ void httpSendSuccResp(HttpContext *pContext, char *desc) { ...@@ -164,9 +171,16 @@ void httpSendSuccResp(HttpContext *pContext, char *desc) {
char head[1024] = {0}; char head[1024] = {0};
char body[1024] = {0}; char body[1024] = {0};
int8_t httpVersion = 0;
int8_t keepAlive = 0;
if (pContext->parser != NULL) {
httpVersion = pContext->parser->httpVersion;
keepAlive = pContext->parser->keepAlive;
}
int32_t bodyLen = sprintf(body, httpRespTemplate[HTTP_RESPONSE_JSON_OK], TSDB_CODE_SUCCESS, desc); int32_t bodyLen = sprintf(body, httpRespTemplate[HTTP_RESPONSE_JSON_OK], TSDB_CODE_SUCCESS, desc);
int32_t headLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_OK], httpVersionStr[pContext->parser->httpVersion], int32_t headLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_OK], httpVersionStr[httpVersion],
httpKeepAliveStr[pContext->parser->keepAlive], bodyLen); httpKeepAliveStr[keepAlive], bodyLen);
httpWriteBuf(pContext, head, headLen); httpWriteBuf(pContext, head, headLen);
httpWriteBuf(pContext, body, bodyLen); httpWriteBuf(pContext, body, bodyLen);
...@@ -177,9 +191,16 @@ void httpSendOptionResp(HttpContext *pContext, char *desc) { ...@@ -177,9 +191,16 @@ void httpSendOptionResp(HttpContext *pContext, char *desc) {
char head[1024] = {0}; char head[1024] = {0};
char body[1024] = {0}; char body[1024] = {0};
int8_t httpVersion = 0;
int8_t keepAlive = 0;
if (pContext->parser != NULL) {
httpVersion = pContext->parser->httpVersion;
keepAlive = pContext->parser->keepAlive;
}
int32_t bodyLen = sprintf(body, httpRespTemplate[HTTP_RESPONSE_JSON_OK], TSDB_CODE_SUCCESS, desc); int32_t bodyLen = sprintf(body, httpRespTemplate[HTTP_RESPONSE_JSON_OK], TSDB_CODE_SUCCESS, desc);
int32_t headLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_OPTIONS], httpVersionStr[pContext->parser->httpVersion], int32_t headLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_OPTIONS], httpVersionStr[httpVersion],
httpKeepAliveStr[pContext->parser->keepAlive], bodyLen); httpKeepAliveStr[keepAlive], bodyLen);
httpWriteBuf(pContext, head, headLen); httpWriteBuf(pContext, head, headLen);
httpWriteBuf(pContext, body, bodyLen); httpWriteBuf(pContext, body, bodyLen);
......
...@@ -503,9 +503,10 @@ void *syncRetrieveData(void *param) { ...@@ -503,9 +503,10 @@ void *syncRetrieveData(void *param) {
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
// The ref is obtained in both the create thread and the current thread, so it is released twice // The ref is obtained in both the create thread and the current thread, so it is released twice
sInfo("%s, sync retrieve data over, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
sInfo("%s, sync retrieve data over, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
return NULL; return NULL;
} }
...@@ -101,8 +101,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -101,8 +101,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
return syncCode; return syncCode;
} }
static int32_t vnodeCheckWrite(void *vparam) { static int32_t vnodeCheckWrite(SVnodeObj *pVnode) {
SVnodeObj *pVnode = vparam;
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
...@@ -216,29 +215,21 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR ...@@ -216,29 +215,21 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32_t qtype, SRpcMsg *pRpcMsg) {
SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam;
int32_t code = 0;
if (qtype == TAOS_QTYPE_RPC) {
code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
}
if (pHead->len > TSDB_MAX_WAL_SIZE) { if (pHead->len > TSDB_MAX_WAL_SIZE) {
vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version); vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version);
return TSDB_CODE_WAL_SIZE_LIMIT; terrno = TSDB_CODE_WAL_SIZE_LIMIT;
return NULL;
} }
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len; int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
SVWriteMsg *pWrite = taosAllocateQitem(size); SVWriteMsg *pWrite = taosAllocateQitem(size);
if (pWrite == NULL) { if (pWrite == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY; terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
return NULL;
} }
if (rparam != NULL) { if (pRpcMsg != NULL) {
SRpcMsg *pRpcMsg = rparam;
pWrite->rpcMsg = *pRpcMsg; pWrite->rpcMsg = *pRpcMsg;
} }
...@@ -248,6 +239,21 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -248,6 +239,21 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
return pWrite;
}
static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
SVnodeObj *pVnode = pWrite->pVnode;
if (pWrite->qtype == TAOS_QTYPE_RPC) {
int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
return code;
}
}
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) { if (queued > MAX_QUEUED_MSG_NUM) {
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3; int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
...@@ -256,15 +262,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -256,15 +262,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
taosMsleep(ms); taosMsleep(ms);
} }
code = vnodePerformFlowCtrl(pWrite);
if (code != 0) return 0;
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
taosWriteQitem(pVnode->wqueue, qtype, pWrite); taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
SVWriteMsg *pWrite = vnodeBuildVWriteMsg(vparam, wparam, qtype, rparam);
if (pWrite == NULL) {
assert(terrno != 0);
return terrno;
}
int32_t code = vnodePerformFlowCtrl(pWrite);
if (code != 0) return 0;
return vnodeWriteToWQueueImp(pWrite);
}
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
...@@ -294,7 +310,10 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { ...@@ -294,7 +310,10 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
vDebug("vgId:%d, msg:%p, write into vwqueue after flowctrl, retry:%d", pVnode->vgId, pWrite, vDebug("vgId:%d, msg:%p, write into vwqueue after flowctrl, retry:%d", pVnode->vgId, pWrite,
pWrite->processedCount); pWrite->processedCount);
pWrite->processedCount = 0; pWrite->processedCount = 0;
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); code = vnodeWriteToWQueueImp(pWrite);
if (code != 0) {
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
}
} }
} }
} }
......
...@@ -28,6 +28,7 @@ class TDTestCase: ...@@ -28,6 +28,7 @@ class TDTestCase:
def run(self): def run(self):
time.sleep(5)
tdSql.execute("use log") tdSql.execute("use log")
tdSql.execute("create table cpustrm as select count(*), avg(cpu_taosd), max(cpu_taosd), min(cpu_taosd), avg(cpu_system), max(cpu_cores), min(cpu_cores), last(cpu_cores) from log.dn1 interval(4s)") tdSql.execute("create table cpustrm as select count(*), avg(cpu_taosd), max(cpu_taosd), min(cpu_taosd), avg(cpu_system), max(cpu_cores), min(cpu_cores), last(cpu_cores) from log.dn1 interval(4s)")
......
...@@ -99,9 +99,11 @@ print ========= step2 alter db ...@@ -99,9 +99,11 @@ print ========= step2 alter db
sql_error alter database d1 replica 1 sql_error alter database d1 replica 1
sql_error alter database d2 replica 1 sql_error alter database d2 replica 1
sql_error alter database d3 replica 1 sql_error alter database d3 replica 1
sql_error alter database d4 replica 1
sql alter database d1 replica 2 sql alter database d1 replica 2
sql alter database d2 replica 2 sql alter database d2 replica 2
sql alter database d3 replica 2 sql alter database d3 replica 2
sql alter database d4 replica 2
$x = 0 $x = 0
a2: a2:
...@@ -129,9 +131,16 @@ if $data03 != 2 then ...@@ -129,9 +131,16 @@ if $data03 != 2 then
goto a2 goto a2
endi endi
sql show d4.vgroups
print online vnodes $data03
if $data03 != 2 then
goto a2
endi
sql alter database d1 replica 1 sql alter database d1 replica 1
sql alter database d2 replica 1 sql alter database d2 replica 1
sql alter database d3 replica 1 sql alter database d3 replica 1
sql alter database d4 replica 1
$x = 0 $x = 0
a1: a1:
...@@ -159,6 +168,27 @@ if $data03 != 1 then ...@@ -159,6 +168,27 @@ if $data03 != 1 then
goto a1 goto a1
endi endi
sql show d4.vgroups
print online vnodes $data03
if $data03 != 1 then
goto a1
endi
sql show dnodes
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
if $data02 != 0 then
goto a1
endi
if $data12 != 2 then
goto a1
endi
if $data22 != 2 then
goto a1
endi
print ========= step3 print ========= step3
sql reset query cache sql reset query cache
sleep 100 sleep 100
...@@ -192,6 +222,7 @@ print ========= step4 alter db ...@@ -192,6 +222,7 @@ print ========= step4 alter db
sql alter database d1 replica 2 sql alter database d1 replica 2
sql alter database d2 replica 2 sql alter database d2 replica 2
sql alter database d3 replica 2 sql alter database d3 replica 2
sql alter database d4 replica 2
$x = 0 $x = 0
step4: step4:
...@@ -219,6 +250,12 @@ if $data03 != 2 then ...@@ -219,6 +250,12 @@ if $data03 != 2 then
goto step4 goto step4
endi endi
sql show d4.vgroups
print online vnodes $data03
if $data03 != 2 then
goto step4
endi
sql insert into d1.t1 values(now, 3) sql insert into d1.t1 values(now, 3)
sql insert into d2.t2 values(now, 3) sql insert into d2.t2 values(now, 3)
sql insert into d3.t3 values(now, 3) sql insert into d3.t3 values(now, 3)
...@@ -287,3 +324,4 @@ sql select * from d4.t4 ...@@ -287,3 +324,4 @@ sql select * from d4.t4
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册