diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index ff91989e5f15b00775fd02505704a3afccaab500..f1e736130c5f036e59b39e65f4533c5b473e294e 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -259,6 +259,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x080B, "CPU cores TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired") // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") @@ -365,7 +366,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, 0, 0x11A4, "tag value TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, 0, 0x11A5, "value not find") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, 0, 0x11A6, "value type should be boolean, number or string") - TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, 0, 0x2100, "out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, 0, 0x2101, "convertion not a valid literal input") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_UNDEF, 0, 0x2102, "convertion undefined") diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 36e1a1eb2b1b8316b2cb4eee1ad40fa93e9b06ac..e5fda26687224066b10ee4fe24ed05a34a5f1648 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -107,7 +107,7 @@ static taos_queue tsSdbWQueue; static SSdbWorkerPool tsSdbPool; static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused); -static int32_t sdbWriteWalToQueue(int32_t vgId, void *pHead, int32_t qtype, void *rparam); +static int32_t sdbWriteFwdToQueue(int32_t vgId, void *pHead, int32_t qtype, void *rparam); static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action); static void sdbFreeFromQueue(SSdbRow *pRow); static void * sdbWorkerFp(void *pWorker); @@ -372,7 +372,7 @@ void sdbUpdateSync(void *pMnodes) { sprintf(syncInfo.path, "%s", tsMnodeDir); syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getFileInfo = sdbGetFileInfo; - syncInfo.writeToCache = sdbWriteWalToQueue; + syncInfo.writeToCache = sdbWriteFwdToQueue; syncInfo.confirmForward = sdbConfirmForward; syncInfo.notifyRole = sdbNotifyRole; tsSdbMgmt.cfg = syncCfg; @@ -559,7 +559,36 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) { return TSDB_CODE_SUCCESS; } -static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { +static int32_t sdbPerformInsertAction(SWalHead *pHead, SSdbTable *pTable) { + SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; + (*pTable->fpDecode)(&row); + return sdbInsertHash(pTable, &row); +} + +static int32_t sdbPerformDeleteAction(SWalHead *pHead, SSdbTable *pTable) { + void *pObj = sdbGetRowMeta(pTable, pHead->cont); + if (pObj == NULL) { + sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore delete action", pTable->name, + sdbGetKeyStr(pTable, pHead->cont)); + return TSDB_CODE_SUCCESS; + } + SSdbRow row = {.pTable = pTable, .pObj = pObj}; + return sdbDeleteHash(pTable, &row); +} + +static int32_t sdbPerformUpdateAction(SWalHead *pHead, SSdbTable *pTable) { + void *pObj = sdbGetRowMeta(pTable, pHead->cont); + if (pObj == NULL) { + sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore update action", pTable->name, + sdbGetKeyStr(pTable, pHead->cont)); + return TSDB_CODE_SUCCESS; + } + SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; + (*pTable->fpDecode)(&row); + return sdbUpdateHash(pTable, &row); +} + +static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unused) { SSdbRow *pRow = wparam; SWalHead *pHead = hparam; int32_t tableId = pHead->msgType / 10; @@ -568,6 +597,8 @@ static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unus SSdbTable *pTable = sdbGetTableFromId(tableId); assert(pTable != NULL); + if (qtype == TAOS_QTYPE_QUERY) return sdbPerformDeleteAction(pHead, pTable); + pthread_mutex_lock(&tsSdbMgmt.mutex); if (pHead->version == 0) { @@ -627,28 +658,17 @@ static int sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *unus // from wal or forward msg, row not created, should add into hash if (action == SDB_ACTION_INSERT) { - SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; - code = (*pTable->fpDecode)(&row); - return sdbInsertHash(pTable, &row); + return sdbPerformInsertAction(pHead, pTable); } else if (action == SDB_ACTION_DELETE) { - void *pObj = sdbGetRowMeta(pTable, pHead->cont); - if (pObj == NULL) { - sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore delete action", pTable->name, - sdbGetKeyStr(pTable, pHead->cont)); + if (qtype == TAOS_QTYPE_FWD) { + // Drop database/stable may take a long time and cause a timeout, so we confirm first then reput it into queue + sdbWriteFwdToQueue(1, hparam, TAOS_QTYPE_QUERY, unused); return TSDB_CODE_SUCCESS; + } else { + return sdbPerformDeleteAction(pHead, pTable); } - SSdbRow row = {.pTable = pTable, .pObj = pObj}; - return sdbDeleteHash(pTable, &row); } else if (action == SDB_ACTION_UPDATE) { - void *pObj = sdbGetRowMeta(pTable, pHead->cont); - if (pObj == NULL) { - sdbDebug("vgId:1, sdb:%s, object:%s not exist in hash, ignore update action", pTable->name, - sdbGetKeyStr(pTable, pHead->cont)); - return TSDB_CODE_SUCCESS; - } - SSdbRow row = {.rowSize = pHead->len, .rowData = pHead->cont, .pTable = pTable}; - code = (*pTable->fpDecode)(&row); - return sdbUpdateHash(pTable, &row); + return sdbPerformUpdateAction(pHead, pTable); } else { return TSDB_CODE_MND_INVALID_MSG_TYPE; } @@ -966,7 +986,7 @@ static void sdbFreeFromQueue(SSdbRow *pRow) { taosFreeQitem(pRow); } -static int32_t sdbWriteWalToQueue(int32_t vgId, void *wparam, int32_t qtype, void *rparam) { +static int32_t sdbWriteFwdToQueue(int32_t vgId, void *wparam, int32_t qtype, void *rparam) { SWalHead *pHead = wparam; int32_t size = sizeof(SSdbRow) + sizeof(SWalHead) + pHead->len; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index bc3803b732d9f089452efa084c1050eb21a04628..d2d6d2d7fae03e2341c58e73ff4984046655d682 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -1279,7 +1279,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId, pFwdInfo->version, time, pFwdInfo->time); - syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_SYN_CONFIRM_EXPIRED); } syncRemoveConfirmedFwdInfo(pNode); diff --git a/tests/script/tmp/mnodes.sim b/tests/script/tmp/mnodes.sim index 48dbc19cb2a083a9d59cca782a66532b57f5dd4a..de02ae741b75d4764f280d4aca5eb9cb139aa7ff 100644 --- a/tests/script/tmp/mnodes.sim +++ b/tests/script/tmp/mnodes.sim @@ -20,6 +20,10 @@ system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000 system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000 system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000 +system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20 +system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20 +system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20 + system sh/cfg.sh -n dnode1 -c replica -v 3 system sh/cfg.sh -n dnode2 -c replica -v 3 system sh/cfg.sh -n dnode3 -c replica -v 3