未验证 提交 70d880da 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2647 from taosdata/feature/vnode

Feature/vnode
......@@ -94,6 +94,7 @@ void sdbDecRef(void *thandle, void *pRow);
int64_t sdbGetNumOfRows(void *handle);
int32_t sdbGetId(void *handle);
uint64_t sdbGetVersion();
bool sdbCheckRowDeleted(void *thandle, void *pRow);
#ifdef __cplusplus
}
......
......@@ -661,6 +661,14 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS;
}
bool sdbCheckRowDeleted(void *pTableInput, void *pRow) {
SSdbTable *pTable = pTableInput;
if (pTable == NULL) return false;
int8_t *updateEnd = pRow + pTable->refCountPos - 1;
return (*updateEnd == 1);
}
int32_t sdbDeleteRow(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
......
......@@ -72,7 +72,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessDropTableMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn);
static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *mnodeMsg);
......@@ -754,7 +754,7 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable;
mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
pDrop->tableId, pCTable->vgId, pCTable->sid, pCTable->uid);
return mnodeProcessDropChildTableMsg(pMsg);
return mnodeProcessDropChildTableMsg(pMsg, true);
}
}
......@@ -1758,7 +1758,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
}
}
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
if (pMsg->pVgroup == NULL) {
......@@ -1793,6 +1793,8 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
};
if (!needReturn) rpcMsg.ahandle = NULL;
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
......@@ -2246,6 +2248,14 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
assert(pTable);
// If the table is deleted by another thread during creation, stop creating and send drop msg to vnode
if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) {
mDebug("app:%p:%p, table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64,
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid);
mnodeProcessDropChildTableMsg(mnodeMsg, false);
rpcMsg->code = TSDB_CODE_SUCCESS;
}
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont;
if (pCreate->getMeta) {
......
......@@ -105,7 +105,7 @@
#define HTTP_OP_VALUE_TYPE 79
//tgf
#define HTTP_TG_STABLE_NOT_EXIST 80
#define HTTP_TG_STABLE_NOT_EXIST 80
extern char *httpMsg[];
......
......@@ -61,6 +61,9 @@
#define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_READ_DATA_SUCCESS 0
#define HTTP_READ_DATA_FAILED 1
#define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000
......
......@@ -23,6 +23,6 @@ void httpCleanUpConnect();
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer);
bool httpReadDataImp(HttpContext *pContext);
int httpReadDataImp(HttpContext *pContext);
#endif
......@@ -60,6 +60,7 @@ bool httpParseURL(HttpContext* pContext) {
char* pSeek;
char* pEnd = strchr(pParser->pLast, ' ');
if (pEnd == NULL) {
httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL);
return false;
}
......@@ -275,14 +276,14 @@ bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test)
return true;
}
bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) {
int httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) {
bool parsedOk = httpParseChunkedBody(pContext, pParser, true);
if (parsedOk) {
httpParseChunkedBody(pContext, pParser, false);
return HTTP_CHECK_BODY_SUCCESS;
} else {
httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr);
if (!httpReadDataImp(pContext)) {
if (httpReadDataImp(pContext) != HTTP_READ_DATA_SUCCESS) {
httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr);
return HTTP_CHECK_BODY_ERROR;
} else {
......@@ -296,7 +297,6 @@ int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) {
if (dataReadLen > pParser->data.len) {
httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, read size:%d dataReadLen:%d > pContext->data.len:%d",
pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len);
httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR);
return HTTP_CHECK_BODY_ERROR;
} else if (dataReadLen < pParser->data.len) {
httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read",
......@@ -358,20 +358,13 @@ bool httpParseRequest(HttpContext* pContext) {
}
int httpCheckReadCompleted(HttpContext* pContext) {
HttpParser *pParser = &pContext->parser;
HttpParser* pParser = &pContext->parser;
if (pContext->httpChunked == HTTP_UNCUNKED) {
int ret = httpReadUnChunkedBody(pContext, pParser);
if (ret != HTTP_CHECK_BODY_SUCCESS) {
return ret;
}
return httpReadUnChunkedBody(pContext, pParser);
} else {
int ret = httpReadChunkedBody(pContext, pParser);
if (ret != HTTP_CHECK_BODY_SUCCESS) {
return ret;
}
return httpReadChunkedBody(pContext, pParser);
}
return HTTP_CHECK_BODY_SUCCESS;
}
bool httpDecodeRequest(HttpContext* pContext) {
......
......@@ -69,7 +69,7 @@ void httpCleanUpConnect() {
httpDebug("http server:%s is cleaned up", pServer->label);
}
bool httpReadDataImp(HttpContext *pContext) {
int httpReadDataImp(HttpContext *pContext) {
HttpParser *pParser = &pContext->parser;
while (pParser->bufsize <= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
......@@ -85,8 +85,7 @@ bool httpReadDataImp(HttpContext *pContext) {
} else {
httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect",
pContext, pContext->fd, pContext->ipstr, errno);
httpReleaseContext(pContext);
return false;
return HTTP_READ_DATA_FAILED;
}
} else {
pParser->bufsize += nread;
......@@ -95,15 +94,13 @@ bool httpReadDataImp(HttpContext *pContext) {
if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE);
httpSendErrorResp(pContext, HTTP_REQUSET_TOO_BIG);
httpNotifyContextClose(pContext);
return false;
return HTTP_REQUSET_TOO_BIG;
}
}
pParser->buffer[pParser->bufsize] = 0;
return true;
return HTTP_READ_DATA_SUCCESS;
}
static bool httpDecompressData(HttpContext *pContext) {
......@@ -141,8 +138,14 @@ static bool httpReadData(HttpContext *pContext) {
httpInitContext(pContext);
}
if (!httpReadDataImp(pContext)) {
httpNotifyContextClose(pContext);
int32_t code = httpReadDataImp(pContext);
if (code != HTTP_READ_DATA_SUCCESS) {
if (code == HTTP_READ_DATA_FAILED) {
httpReleaseContext(pContext);
} else {
httpSendErrorResp(pContext, code);
httpNotifyContextClose(pContext);
}
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册