diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 4b5392257ffa072c53c4332733518c8f1f68f8bd..17eb5714e0272bcebe21de84d938f94708b7c4d9 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -206,7 +206,7 @@ int32_t tsNumOfLogLines = 10000000; int32_t mDebugFlag = 131; int32_t sdbDebugFlag = 131; int32_t dDebugFlag = 135; -int32_t vDebugFlag = 131; +int32_t vDebugFlag = 135; int32_t cDebugFlag = 131; int32_t jniDebugFlag = 131; int32_t odbcDebugFlag = 131; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 3968d5b8c9dfbb53ad04179e0451fc9aab76980e..e278c3a7ccd145585b70d3488abaf1a9dceab571 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -161,7 +161,7 @@ void cqStop(void *handle) { return; } SCqContext *pContext = handle; - cInfo("vgId:%d, stop all CQs", pContext->vgId); + cDebug("vgId:%d, stop all CQs", pContext->vgId); if (pContext->dbConn == NULL || pContext->master == 0) return; pthread_mutex_lock(&pContext->mutex); diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index e0d7e018434f15d96fe559440ba6b07f027cd930..be33262f7f75f20485c977c3b3a7378a6021481a 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -206,9 +206,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing da TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Database memory is full for commit failed") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, 0, 0x050C, "Database memory is full for waiting commit") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Database write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_SYNCING, 0, 0x0513, "Database is syncing") // tsdb diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 48342b38138a560f3293de2ea43e8d66e595252d..6cc4e097350783a93300c46adbd5f68faf43e143 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -35,7 +35,7 @@ #include "mnodeSdb.h" #define SDB_TABLE_LEN 12 -#define MAX_QUEUED_MSG_NUM 1024 +#define MAX_QUEUED_MSG_NUM 10000 typedef enum { SDB_ACTION_INSERT = 0, diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 03d1272771d599e9115896148007c9b1fe310151..7ad97396c2df2b105056861e5c2916de92c7d8d9 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -133,7 +133,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void int32_t code = vnodeWriteToRQueue(pVnode, qhandle, 0, TAOS_QTYPE_QUERY, &rpcMsg); if (code == TSDB_CODE_SUCCESS) { - vDebug("QInfo:%p add to vread queue for exec query", *qhandle); + vTrace("QInfo:%p add to vread queue for exec query", *qhandle); } return code; @@ -164,7 +164,7 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, } } else { *freeHandle = true; - vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle); + vTrace("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle); } } else { SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); @@ -266,7 +266,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { } if (handle != NULL) { - vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); + vTrace("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); code = vnodePutItemIntoReadQueue(pVnode, handle, pRead->rpcHandle); if (code != TSDB_CODE_SUCCESS) { pRsp->code = code; @@ -278,7 +278,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { assert(pCont != NULL); void **qhandle = (void **)pRead->qhandle; - vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); + vTrace("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); // In the retrieve blocking model, only 50% CPU will be used in query processing if (tsHalfCoresForQuery) { @@ -294,7 +294,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle); assert(pRead->rpcHandle != NULL); - vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, + vTrace("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, pRead->rpcHandle); // set the real rsp error code @@ -327,7 +327,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { pRetrieve->free = htons(pRetrieve->free); pRetrieve->qhandle = htobe64(pRetrieve->qhandle); - vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle, + vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle, pRetrieve->free, pRead->rpcHandle); memset(pRet, 0, sizeof(SRspRet)); @@ -410,6 +410,6 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { pMsg->header.vgId = htonl(vgId); pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); - vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); + vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle); return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg)); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index b23eeb207d32b079d80a18d38874875f8d0275f4..cd462f7f0a1e37b514e9ffd3a890880918a149d6 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -23,7 +23,7 @@ #include "dnode.h" #include "vnodeStatus.h" -#define MAX_QUEUED_MSG_NUM 1024 +#define MAX_QUEUED_MSG_NUM 10000 extern void * tsDnodeTmr; static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *); @@ -271,6 +271,8 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { SVnodeObj * pVnode = pWrite->pVnode; int32_t code = TSDB_CODE_VND_SYNCING; + if (pVnode->flowctrlLevel <= 0) code = TSDB_CODE_VND_IS_FLOWCTRL; + pWrite->processedCount++; if (pWrite->processedCount > 100) { vError("vgId:%d, msg:%p, failed to process since %s, retry:%d", pVnode->vgId, pWrite, tstrerror(code), @@ -290,8 +292,10 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { SVnodeObj *pVnode = pWrite->pVnode; - if (pVnode->flowctrlLevel <= 0) return 0; - if (pWrite->qtype != TAOS_QTYPE_RPC) return 0; + if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM) { + if (pVnode->flowctrlLevel <= 0) return 0; + if (pWrite->qtype != TAOS_QTYPE_RPC) return 0; + } if (tsFlowCtrl == 0) { int32_t ms = pow(2, pVnode->flowctrlLevel + 2);