提交 97434ac0 编写于 作者: S Shengliang Guan

TD-1795

上级 a988483a
......@@ -107,7 +107,7 @@ static taos_queue tsSdbWQueue;
static SSdbWorkerPool tsSdbPool;
static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused);
static int32_t sdbWriteWalToQueue(int32_t vgId, void *pHead, int32_t qtype, void *rparam);
static int32_t sdbWriteFwdToQueue(int32_t vgId, void *pHead, int32_t qtype, void *rparam);
static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action);
static void sdbFreeFromQueue(SSdbRow *pRow);
static void * sdbWorkerFp(void *pWorker);
......@@ -261,12 +261,6 @@ static void sdbHandleFailedConfirm(SSdbRow *pRow) {
SSdbRow row = {.type = SDB_OPER_GLOBAL, .pTable = pRow->pTable, .pObj = pRow->pObj};
sdbDeleteRow(&row);
}
// Drop database/stable may take a long time and cause a timeout, so confirm is not enforced
if (action == SDB_ACTION_DELETE && pRow->code == TSDB_CODE_SYN_CONFIRM_EXPIRED) {
sdbDebug("vgId:1, confirm is not enforced while perform drop operation, set it success");
pRow->code = TSDB_CODE_SUCCESS;
}
}
FORCE_INLINE
......@@ -378,7 +372,7 @@ void sdbUpdateSync(void *pMnodes) {
sprintf(syncInfo.path, "%s", tsMnodeDir);
syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.getFileInfo = sdbGetFileInfo;
syncInfo.writeToCache = sdbWriteWalToQueue;
syncInfo.writeToCache = sdbWriteFwdToQueue;
syncInfo.confirmForward = sdbConfirmForward;
syncInfo.notifyRole = sdbNotifyRole;
tsSdbMgmt.cfg = syncCfg;
......@@ -565,7 +559,36 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) {
return TSDB_CODE_SUCCESS;
}
static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
static int32_t sdbPerformInsertAction(SWalHead *pHead, SSdbTable *pTable) {
SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
(*pTable->fpDecode)(&row);
return sdbInsertHash(pTable, &row);
}
static int32_t sdbPerformDeleteAction(SWalHead *pHead, SSdbTable *pTable) {
void *pObj = sdbGetRowMeta(pTable, pHead->cont);
if (pObj == NULL) {
sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore delete action", pTable->name,
sdbGetKeyStr(pTable, pHead->cont));
return TSDB_CODE_SUCCESS;
}
SSdbRow row = {.pTable = pTable, .pObj = pObj};
return sdbDeleteHash(pTable, &row);
}
static int32_t sdbPerformUpdateAction(SWalHead *pHead, SSdbTable *pTable) {
void *pObj = sdbGetRowMeta(pTable, pHead->cont);
if (pObj == NULL) {
sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore update action", pTable->name,
sdbGetKeyStr(pTable, pHead->cont));
return TSDB_CODE_SUCCESS;
}
SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
(*pTable->fpDecode)(&row);
return sdbUpdateHash(pTable, &row);
}
static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unused) {
SSdbRow *pRow = wparam;
SWalHead *pHead = hparam;
int32_t tableId = pHead->msgType / 10;
......@@ -574,6 +597,8 @@ static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unus
SSdbTable *pTable = sdbGetTableFromId(tableId);
assert(pTable != NULL);
if (qtype == TAOS_QTYPE_QUERY) return sdbPerformDeleteAction(pHead, pTable);
pthread_mutex_lock(&tsSdbMgmt.mutex);
if (pHead->version == 0) {
......@@ -633,28 +658,17 @@ static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unus
// from wal or forward msg, row not created, should add into hash
if (action == SDB_ACTION_INSERT) {
SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
code = (*pTable->fpDecode)(&row);
return sdbInsertHash(pTable, &row);
return sdbPerformInsertAction(pHead, pTable);
} else if (action == SDB_ACTION_DELETE) {
void *pObj = sdbGetRowMeta(pTable, pHead->cont);
if (pObj == NULL) {
sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore delete action", pTable->name,
sdbGetKeyStr(pTable, pHead->cont));
if (qtype == TAOS_QTYPE_FWD) {
// Drop database/stable may take a long time and cause a timeout, so we confirm first then reput it into queue
sdbWriteFwdToQueue(1, hparam, TAOS_QTYPE_QUERY, unused);
return TSDB_CODE_SUCCESS;
} else {
return sdbPerformDeleteAction(pHead, pTable);
}
SSdbRow row = {.pTable = pTable, .pObj = pObj};
return sdbDeleteHash(pTable, &row);
} else if (action == SDB_ACTION_UPDATE) {
void *pObj = sdbGetRowMeta(pTable, pHead->cont);
if (pObj == NULL) {
sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore update action", pTable->name,
sdbGetKeyStr(pTable, pHead->cont));
return TSDB_CODE_SUCCESS;
}
SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable};
code = (*pTable->fpDecode)(&row);
return sdbUpdateHash(pTable, &row);
return sdbPerformUpdateAction(pHead, pTable);
} else {
return TSDB_CODE_MND_INVALID_MSG_TYPE;
}
......@@ -972,7 +986,7 @@ static void sdbFreeFromQueue(SSdbRow *pRow) {
taosFreeQitem(pRow);
}
static int32_t sdbWriteWalToQueue(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
static int32_t sdbWriteFwdToQueue(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
SWalHead *pHead = wparam;
int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pHead->len;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册