提交 b03ed525 编写于 作者: S Shengliang Guan

[TD-860] change sync confirm in sdb

上级 dc7e5963
......@@ -131,8 +131,8 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
taosFreeQitem(pWrite);
}
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
SMnodeMsg *pWrite = pRaw;
void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) {
SMnodeMsg *pWrite = pMsg;
if (pWrite == NULL) return;
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
......@@ -140,6 +140,8 @@ void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
return;
}
if (code > 0) return;
SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rpcRsp.rsp,
......
......@@ -72,8 +72,6 @@ typedef struct {
void * sync;
void * wal;
SSyncCfg cfg;
sem_t sem;
int32_t code;
int32_t numOfTables;
SSdbTable *tableList[SDB_TABLE_MAX];
pthread_mutex_t mutex;
......@@ -244,27 +242,19 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
sdbUpdateMnodeRoles();
}
FORCE_INLINE
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
tsSdbObj.code = code;
sem_post(&tsSdbObj.sem);
sdbDebug("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code));
}
static int32_t sdbForwardToPeer(SWalHead *pHead) {
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;
SMnodeMsg *pMsg = param;
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC);
if (code > 0) {
sdbDebug("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code);
sem_wait(&tsSdbObj.sem);
return tsSdbObj.code;
}
return code;
if (pMsg) {
sdbDebug("app:%p:%p, forward request is confirmed, result:%s", pMsg->rpcMsg.ahandle, pMsg, tstrerror(code));
}
dnodeSendRpcMnodeWriteRsp(pMsg, code);
}
void sdbUpdateSync() {
SSyncCfg syncCfg = {0};
int32_t index = 0;
int32_t index = 0;
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
......@@ -298,7 +288,7 @@ void sdbUpdateSync() {
}
syncCfg.replica = index;
syncCfg.quorum = (syncCfg.replica == 1) ? 1:2;
syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
bool hasThisDnode = false;
for (int32_t i = 0; i < syncCfg.replica; ++i) {
......@@ -325,10 +315,10 @@ void sdbUpdateSync() {
syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.getFileInfo = sdbGetFileInfo;
syncInfo.writeToCache = sdbWriteToQueue;
syncInfo.confirmForward = sdbConfirmForward;
syncInfo.confirmForward = sdbConfirmForward;
syncInfo.notifyRole = sdbNotifyRole;
tsSdbObj.cfg = syncCfg;
if (tsSdbObj.sync) {
syncReconfig(tsSdbObj.sync, &syncCfg);
} else {
......@@ -339,7 +329,6 @@ void sdbUpdateSync() {
int32_t sdbInit() {
pthread_mutex_init(&tsSdbObj.mutex, NULL);
sem_init(&tsSdbObj.sem, 0, 0);
if (sdbInitWriteWorker() != 0) {
return -1;
......@@ -379,7 +368,6 @@ void sdbCleanUp() {
tsSdbObj.wal = NULL;
}
sem_destroy(&tsSdbObj.sem);
pthread_mutex_destroy(&tsSdbObj.mutex);
}
......@@ -513,24 +501,22 @@ static int sdbWrite(void *param, void *data, int type) {
assert(pTable != NULL);
pthread_mutex_lock(&tsSdbObj.mutex);
if (pHead->version == 0) {
// assign version
// assign version
tsSdbObj.version++;
pHead->version = tsSdbObj.version;
} else {
// for data from WAL or forward, version may be smaller
if (pHead->version <= tsSdbObj.version) {
pthread_mutex_unlock(&tsSdbObj.mutex);
if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) {
sdbDebug("forward request is received, version:%" PRIu64 " confirm it", pHead->version);
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
}
sdbDebug("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
return TSDB_CODE_SUCCESS;
} else if (pHead->version != tsSdbObj.version + 1) {
pthread_mutex_unlock(&tsSdbObj.mutex);
sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version,
tsSdbObj.version);
sdbError("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
return TSDB_CODE_MND_APP_ERROR;
} else {
tsSdbObj.version = pHead->version;
......@@ -543,27 +529,33 @@ static int sdbWrite(void *param, void *data, int type) {
return code;
}
code = sdbForwardToPeer(pHead);
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
void *mhandle = NULL;
if (pOper != NULL) mhandle = pOper->pMsg;
int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, mhandle, TAOS_QTYPE_RPC);
pthread_mutex_unlock(&tsSdbObj.mutex);
if (syncCode < 0) {
sdbDebug("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
return syncCode;
} else if (syncCode > 0) {
sdbDebug("table:%s, forward request is sent, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
} else {}
// from app, oper is created
if (pOper != NULL) {
sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s",
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version,
tstrerror(code));
return code;
}
// from wal or forward msg, oper not created, should add into hash
if (tsSdbObj.sync != NULL) {
sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it",
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
sdbDebug("table:%s, record from app is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,
tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
return syncCode;
} else {
sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
sdbDebug("table:%s, record from wal/fwd is disposed, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,
tstrerror(code), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
}
// from wal or forward msg, oper not created, should add into hash
if (action == SDB_ACTION_INSERT) {
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
code = (*pTable->decodeFp)(&oper);
......@@ -944,17 +936,16 @@ static void *sdbWorkerFp(void *param) {
if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item;
pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
if (pOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
}
} else {
pHead = (SWalHead *)item;
pOper = NULL;
}
if (pOper != NULL && pOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
}
int32_t code = sdbWrite(pOper, pHead, type);
if (pOper) pOper->retCode = code;
}
......@@ -965,23 +956,24 @@ static void *sdbWorkerFp(void *param) {
taosResetQitems(tsSdbWriteQall);
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item;
if (pOper != NULL && pOper->cb != NULL) {
sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i);
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
if (pOper == NULL) {
taosFreeQitem(item);
continue;
}
if (pOper != NULL && pOper->pMsg != NULL) {
sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg,
tstrerror(pOper->retCode));
}
if (pOper != NULL) {
sdbDecRef(pOper->table, pOper->pObj);
if (pOper->cb != NULL) {
sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i);
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
}
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
sdbDecRef(pOper->table, pOper->pObj);
} else if (type == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
} else {
}
taosFreeQitem(item);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册