未验证 提交 ffed4483 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4304 from taosdata/feature/wal

TD-2046
...@@ -168,7 +168,8 @@ static void *dnodeProcessMReadQueue(void *param) { ...@@ -168,7 +168,8 @@ static void *dnodeProcessMReadQueue(void *param) {
break; break;
} }
dDebug("%p, msg:%s will be processed in mread queue", pRead->rpcMsg.ahandle, taosMsg[pRead->rpcMsg.msgType]); dDebug("msg:%p, app:%p type:%s will be processed in mread queue", pRead->rpcMsg.ahandle, pRead,
taosMsg[pRead->rpcMsg.msgType]);
int32_t code = mnodeProcessRead(pRead); int32_t code = mnodeProcessRead(pRead);
dnodeSendRpcMReadRsp(pRead, code); dnodeSendRpcMReadRsp(pRead, code);
} }
......
...@@ -127,7 +127,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { ...@@ -127,7 +127,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
dnodeSendRedirectMsg(pMsg, true); dnodeSendRedirectMsg(pMsg, true);
} else { } else {
SMnodeMsg *pWrite = mnodeCreateMsg(pMsg); SMnodeMsg *pWrite = mnodeCreateMsg(pMsg);
dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
} }
...@@ -136,7 +136,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { ...@@ -136,7 +136,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
} }
static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) { static void dnodeFreeMWriteMsg(SMnodeMsg *pWrite) {
dDebug("app:%p:%p, msg:%s is freed from mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite, dDebug("msg:%p, app:%p type:%s is freed from mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
mnodeCleanupMsg(pWrite); mnodeCleanupMsg(pWrite);
...@@ -174,7 +174,7 @@ static void *dnodeProcessMWriteQueue(void *param) { ...@@ -174,7 +174,7 @@ static void *dnodeProcessMWriteQueue(void *param) {
break; break;
} }
dDebug("app:%p:%p, msg:%s will be processed in mwrite queue", pWrite->rpcMsg.ahandle, pWrite, dDebug("msg:%p, app:%p type:%s will be processed in mwrite queue", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType]); taosMsg[pWrite->rpcMsg.msgType]);
int32_t code = mnodeProcessWrite(pWrite); int32_t code = mnodeProcessWrite(pWrite);
...@@ -188,13 +188,13 @@ void dnodeReprocessMWriteMsg(void *pMsg) { ...@@ -188,13 +188,13 @@ void dnodeReprocessMWriteMsg(void *pMsg) {
SMnodeMsg *pWrite = pMsg; SMnodeMsg *pWrite = pMsg;
if (!mnodeIsRunning() || tsMWriteQueue == NULL) { if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
dDebug("app:%p:%p, msg:%s is redirected for mnode not running, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, dDebug("msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry); taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
dnodeSendRedirectMsg(pMsg, true); dnodeSendRedirectMsg(pMsg, true);
dnodeFreeMWriteMsg(pWrite); dnodeFreeMWriteMsg(pWrite);
} else { } else {
dDebug("app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d", pWrite->rpcMsg.ahandle, pWrite, dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry); taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
......
...@@ -216,7 +216,7 @@ static void *dnodeProcessMgmtQueue(void *param) { ...@@ -216,7 +216,7 @@ static void *dnodeProcessMgmtQueue(void *param) {
} }
pMsg = &pMgmt->rpcMsg; pMsg = &pMgmt->rpcMsg;
dDebug("%p, msg:%p:%s will be processed", pMsg->ahandle, pMgmt, taosMsg[pMsg->msgType]); dDebug("msg:%p, ahandle:%p type:%s will be processed", pMgmt, pMsg->ahandle, taosMsg[pMsg->msgType]);
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else { } else {
......
...@@ -175,7 +175,7 @@ static void *dnodeProcessReadQueue(void *pWorker) { ...@@ -175,7 +175,7 @@ static void *dnodeProcessReadQueue(void *pWorker) {
break; break;
} }
dDebug("%p, msg:%p:%s will be processed in vread queue, qtype:%d", pRead->rpcAhandle, pRead, dDebug("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle,
taosMsg[pRead->msgType], qtype); taosMsg[pRead->msgType], qtype);
int32_t code = vnodeProcessRead(pVnode, pRead); int32_t code = vnodeProcessRead(pVnode, pRead);
......
...@@ -205,8 +205,8 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -205,8 +205,8 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
bool forceFsync = false; bool forceFsync = false;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite->rpcAhandle, pWrite, dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version); pWrite->rpcAhandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version);
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
if (pWrite->code <= 0) pWrite->processedCount = 1; if (pWrite->code <= 0) pWrite->processedCount = 1;
......
...@@ -48,7 +48,7 @@ void *mnodeCreateMsg(SRpcMsg *pRpcMsg) { ...@@ -48,7 +48,7 @@ void *mnodeCreateMsg(SRpcMsg *pRpcMsg) {
int32_t mnodeInitMsg(SMnodeMsg *pMsg) { int32_t mnodeInitMsg(SMnodeMsg *pMsg) {
if (pMsg->pUser != NULL) { if (pMsg->pUser != NULL) {
mDebug("app:%p:%p, user info already inited", pMsg->rpcMsg.ahandle, pMsg); mTrace("msg:%p, app:%p user info already inited", pMsg, pMsg->rpcMsg.ahandle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -47,7 +47,7 @@ void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { ...@@ -47,7 +47,7 @@ void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
if (pMsg->rpcMsg.pCont == NULL) { if (pMsg->rpcMsg.pCont == NULL) {
mError("%p, msg:%s in mpeer queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN; return TSDB_CODE_MND_INVALID_MSG_LEN;
} }
...@@ -58,8 +58,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -58,8 +58,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, mDebug("msg:%p, ahandle:%p type:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps; epSet->inUse = (i + 1) % epSet->numOfEps;
...@@ -73,7 +73,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -73,7 +73,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
} }
if (tsMnodeProcessPeerMsgFp[pMsg->rpcMsg.msgType] == NULL) { if (tsMnodeProcessPeerMsgFp[pMsg->rpcMsg.msgType] == NULL) {
mError("%p, msg:%s in mpeer queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED; return TSDB_CODE_MND_MSG_NOT_PROCESSED;
} }
...@@ -82,13 +83,14 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -82,13 +83,14 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
void mnodeProcessPeerRsp(SRpcMsg *pMsg) { void mnodeProcessPeerRsp(SRpcMsg *pMsg) {
if (!sdbIsMaster()) { if (!sdbIsMaster()) {
mError("%p, msg:%s is not processed for it is not master", pMsg->ahandle, taosMsg[pMsg->msgType]); mError("msg:%p, ahandle:%p type:%s is not processed for it is not master", pMsg, pMsg->ahandle,
taosMsg[pMsg->msgType]);
return; return;
} }
if (tsMnodeProcessPeerRspFp[pMsg->msgType]) { if (tsMnodeProcessPeerRspFp[pMsg->msgType]) {
(*tsMnodeProcessPeerRspFp[pMsg->msgType])(pMsg); (*tsMnodeProcessPeerRspFp[pMsg->msgType])(pMsg);
} else { } else {
mError("%p, msg:%s is not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); mError("msg:%p, ahandle:%p type:%s is not processed", pMsg, pMsg->ahandle, taosMsg[pMsg->msgType]);
} }
} }
...@@ -43,7 +43,7 @@ void mnodeAddReadMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *pMsg)) { ...@@ -43,7 +43,7 @@ void mnodeAddReadMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *pMsg)) {
int32_t mnodeProcessRead(SMnodeMsg *pMsg) { int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
if (pMsg->rpcMsg.pCont == NULL) { if (pMsg->rpcMsg.pCont == NULL) {
mError("%p, msg:%s in mread queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN; return TSDB_CODE_MND_INVALID_MSG_LEN;
} }
...@@ -52,7 +52,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -52,7 +52,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForShell(epSet); mnodeGetMnodeEpSetForShell(epSet);
mDebug("%p, msg:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, mDebug("msg:%p, app:%p type:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
...@@ -70,13 +70,15 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -70,13 +70,15 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
} }
if (tsMnodeProcessReadMsgFp[pMsg->rpcMsg.msgType] == NULL) { if (tsMnodeProcessReadMsgFp[pMsg->rpcMsg.msgType] == NULL) {
mError("%p, msg:%s in mread queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED; return TSDB_CODE_MND_MSG_NOT_PROCESSED;
} }
int32_t code = mnodeInitMsg(pMsg); int32_t code = mnodeInitMsg(pMsg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("%p, msg:%s in mread queue, not processed reason:%s", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], tstrerror(code)); mError("msg:%p, app:%p type:%s in mread queue, not processed reason:%s", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], tstrerror(code));
return code; return code;
} }
......
...@@ -109,6 +109,7 @@ static SSdbWorkerPool tsSdbPool; ...@@ -109,6 +109,7 @@ static SSdbWorkerPool tsSdbPool;
static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused); static int32_t sdbProcessWrite(void *pRow, void *pHead, int32_t qtype, void *unused);
static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam); static int32_t sdbWriteWalToQueue(void *vparam, void *pHead, int32_t qtype, void *rparam);
static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action); static int32_t sdbWriteRowToQueue(SSdbRow *pRow, int32_t action);
static void sdbFreeFromQueue(SSdbRow *pRow);
static void * sdbWorkerFp(void *pWorker); static void * sdbWorkerFp(void *pWorker);
static int32_t sdbInitWorker(); static int32_t sdbInitWorker();
static void sdbCleanupWorker(); static void sdbCleanupWorker();
...@@ -284,6 +285,7 @@ static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) { ...@@ -284,6 +285,7 @@ static void sdbConfirmForward(void *ahandle, void *wparam, int32_t code) {
} }
dnodeSendRpcMWriteRsp(pMsg, pRow->code); dnodeSendRpcMWriteRsp(pMsg, pRow->code);
sdbFreeFromQueue(pRow);
} }
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); } static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); }
...@@ -951,7 +953,7 @@ static int32_t sdbWriteToQueue(SSdbRow *pRow, int32_t qtype) { ...@@ -951,7 +953,7 @@ static int32_t sdbWriteToQueue(SSdbRow *pRow, int32_t qtype) {
sdbIncRef(pRow->pTable, pRow->pObj); sdbIncRef(pRow->pTable, pRow->pObj);
sdbTrace("vgId:1, msg:%p write into to sdb queue", pRow->pMsg); sdbTrace("vgId:1, msg:%p qtype:%s write into to sdb queue, queued:%d", pRow->pMsg, qtypeStr[qtype], queued);
taosWriteQitem(tsSdbWQueue, qtype, pRow); taosWriteQitem(tsSdbWQueue, qtype, pRow);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
...@@ -974,6 +976,9 @@ static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, voi ...@@ -974,6 +976,9 @@ static int32_t sdbWriteWalToQueue(void *vparam, void *wparam, int32_t qtype, voi
return TSDB_CODE_VND_OUT_OF_MEMORY; return TSDB_CODE_VND_OUT_OF_MEMORY;
} }
memcpy(pRow->pHead, pHead, sizeof(SWalHead) + pHead->len);
pRow->rowData = pRow->pHead->cont;
return sdbWriteToQueue(pRow, qtype); return sdbWriteToQueue(pRow, qtype);
} }
...@@ -1035,12 +1040,12 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1035,12 +1040,12 @@ static void *sdbWorkerFp(void *pWorker) {
if (qtype == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
sdbConfirmForward(NULL, pRow, pRow->code); sdbConfirmForward(NULL, pRow, pRow->code);
} else if (qtype == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code);
} else { } else {
if (qtype == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead->version, pRow->code);
}
sdbFreeFromQueue(pRow);
} }
sdbFreeFromQueue(pRow);
} }
} }
......
此差异已折叠。
...@@ -421,7 +421,7 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi ...@@ -421,7 +421,7 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
int32_t sid = taosAllocateId(pVgroup->idPool); int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) { if (sid <= 0) {
mDebug("app:%p:%p, db:%s, no enough sid in vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pDb->name, pVgroup->vgId); mDebug("msg:%p, app:%p db:%s, no enough sid in vgId:%d", pMsg, pMsg->rpcMsg.ahandle, pDb->name, pVgroup->vgId);
continue; continue;
} }
...@@ -442,8 +442,8 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi ...@@ -442,8 +442,8 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
int32_t code = TSDB_CODE_MND_NO_ENOUGH_DNODES; int32_t code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
if (pDb->numOfVgroups < maxVgroupsPerDb) { if (pDb->numOfVgroups < maxVgroupsPerDb) {
mDebug("app:%p:%p, db:%s, try to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d", pMsg->rpcMsg.ahandle, mDebug("msg:%p, app:%p db:%s, try to create a new vgroup, numOfVgroups:%d maxVgroupsPerDb:%d", pMsg,
pMsg, pDb->name, pDb->numOfVgroups, maxVgroupsPerDb); pMsg->rpcMsg.ahandle, pDb->name, pDb->numOfVgroups, maxVgroupsPerDb);
pthread_mutex_unlock(&pDb->mutex); pthread_mutex_unlock(&pDb->mutex);
code = mnodeCreateVgroup(pMsg); code = mnodeCreateVgroup(pMsg);
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
...@@ -455,8 +455,8 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi ...@@ -455,8 +455,8 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
if (pDb->numOfVgroups < 1) { if (pDb->numOfVgroups < 1) {
pthread_mutex_unlock(&pDb->mutex); pthread_mutex_unlock(&pDb->mutex);
mDebug("app:%p:%p, db:%s, failed create new vgroup since:%s, numOfVgroups:%d maxVgroupsPerDb:%d ", mDebug("msg:%p, app:%p db:%s, failed create new vgroup since:%s, numOfVgroups:%d maxVgroupsPerDb:%d ", pMsg,
pMsg->rpcMsg.ahandle, pMsg, pDb->name, tstrerror(code), pDb->numOfVgroups, maxVgroupsPerDb); pMsg->rpcMsg.ahandle, pDb->name, tstrerror(code), pDb->numOfVgroups, maxVgroupsPerDb);
return code; return code;
} }
...@@ -474,7 +474,7 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi ...@@ -474,7 +474,7 @@ int32_t mnodeGetAvailableVgroup(SMnodeMsg *pMsg, SVgObj **ppVgroup, int32_t *pSi
int32_t sid = taosAllocateId(pVgroup->idPool); int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid <= 0) { if (sid <= 0) {
mError("app:%p:%p, db:%s, no enough sid in vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pDb->name, pVgroup->vgId); mError("msg:%p, app:%p db:%s, no enough sid in vgId:%d", pMsg, pMsg->rpcMsg.ahandle, pDb->name, pVgroup->vgId);
pthread_mutex_unlock(&pDb->mutex); pthread_mutex_unlock(&pDb->mutex);
return TSDB_CODE_MND_NO_ENOUGH_DNODES; return TSDB_CODE_MND_NO_ENOUGH_DNODES;
} }
...@@ -496,10 +496,10 @@ static int32_t mnodeCreateVgroupFp(SMnodeMsg *pMsg) { ...@@ -496,10 +496,10 @@ static int32_t mnodeCreateVgroupFp(SMnodeMsg *pMsg) {
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
assert(pVgroup); assert(pVgroup);
mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, mInfo("msg:%p, app:%p vgId:%d, is created in mnode, db:%s replica:%d", pMsg, pMsg->rpcMsg.ahandle, pVgroup->vgId,
pDb->name, pVgroup->numOfVnodes); pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mInfo("app:%p:%p, vgId:%d, index:%d, dnode:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, i, mInfo("msg:%p, app:%p vgId:%d, index:%d, dnode:%d", pMsg, pMsg->rpcMsg.ahandle, pVgroup->vgId, i,
pVgroup->vnodeGid[i].dnodeId); pVgroup->vnodeGid[i].dnodeId);
} }
...@@ -517,14 +517,14 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -517,14 +517,14 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
assert(pVgroup); assert(pVgroup);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, mError("msg:%p, app:%p vgId:%d, failed to create in sdb, reason:%s", pMsg, pMsg->rpcMsg.ahandle, pVgroup->vgId,
tstrerror(code)); tstrerror(code));
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb}; SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
sdbDeleteRow(&desc); sdbDeleteRow(&desc);
return code; return code;
} else { } else {
mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, mInfo("msg:%p, app:%p vgId:%d, is created in sdb, db:%s replica:%d", pMsg, pMsg->rpcMsg.ahandle, pVgroup->vgId,
pDb->name, pVgroup->numOfVnodes); pDb->name, pVgroup->numOfVnodes);
pVgroup->status = TAOS_VG_STATUS_READY; pVgroup->status = TAOS_VG_STATUS_READY;
SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb}; SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
(void)sdbUpdateRow(&desc); (void)sdbUpdateRow(&desc);
...@@ -532,7 +532,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -532,7 +532,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
dnodeReprocessMWriteMsg(pMsg); dnodeReprocessMWriteMsg(pMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
// if (pVgroup->status == TAOS_VG_STATUS_CREATING || pVgroup->status == TAOS_VG_STATUS_READY) { // if (pVgroup->status == TAOS_VG_STATUS_CREATING || pVgroup->status == TAOS_VG_STATUS_READY) {
// mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, // mInfo("msg:%p, app:%p vgId:%d, is created in sdb, db:%s replica:%d", pMsg, pMsg->rpcMsg.ahandle, pVgroup->vgId,
// pDb->name, pVgroup->numOfVnodes); // pDb->name, pVgroup->numOfVnodes);
// pVgroup->status = TAOS_VG_STATUS_READY; // pVgroup->status = TAOS_VG_STATUS_READY;
// SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb}; // SSdbRow desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .pTable = tsVgroupSdb};
...@@ -540,7 +540,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -540,7 +540,7 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
// dnodeReprocessMWriteMsg(pMsg); // dnodeReprocessMWriteMsg(pMsg);
// return TSDB_CODE_MND_ACTION_IN_PROGRESS; // return TSDB_CODE_MND_ACTION_IN_PROGRESS;
// } else { // } else {
// mError("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d, but vgroup is dropping", pMsg->rpcMsg.ahandle, // mError("msg:%p, app:%p vgId:%d, is created in sdb, db:%s replica:%d, but vgroup is dropping", pMsg->rpcMsg.ahandle,
// pMsg, pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); // pMsg, pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
// return TSDB_CODE_MND_VGROUP_NOT_EXIST; // return TSDB_CODE_MND_VGROUP_NOT_EXIST;
// } // }
......
...@@ -43,7 +43,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg)) ...@@ -43,7 +43,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg))
int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
if (pMsg->rpcMsg.pCont == NULL) { if (pMsg->rpcMsg.pCont == NULL) {
mError("app:%p:%p, msg:%s content is null", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]); mError("msg:%p, app:%p type:%s content is null", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_INVALID_MSG_LEN; return TSDB_CODE_MND_INVALID_MSG_LEN;
} }
...@@ -54,15 +54,15 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -54,15 +54,15 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("app:%p:%p, msg:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg, mDebug("msg:%p, app:%p type:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps; epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("app:%p:%p, mnode index:%d ep:%s:%d, set inUse to %d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i], mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d, set inUse to %d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i],
htons(epSet->port[i]), epSet->inUse); htons(epSet->port[i]), epSet->inUse);
} else { } else {
mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i], mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i],
htons(epSet->port[i])); htons(epSet->port[i]));
} }
} }
...@@ -71,19 +71,19 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -71,19 +71,19 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
} }
if (tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType] == NULL) { if (tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType] == NULL) {
mError("app:%p:%p, msg:%s not processed", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]); mError("msg:%p, app:%p type:%s not processed", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_MSG_NOT_PROCESSED; return TSDB_CODE_MND_MSG_NOT_PROCESSED;
} }
int32_t code = mnodeInitMsg(pMsg); int32_t code = mnodeInitMsg(pMsg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, msg:%s not processed, reason:%s", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType], mError("msg:%p, app:%p type:%s not processed, reason:%s", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType],
tstrerror(code)); tstrerror(code));
return code; return code;
} }
if (!pMsg->pUser->writeAuth) { if (!pMsg->pUser->writeAuth) {
mError("app:%p:%p, msg:%s not processed, no write auth", pMsg->rpcMsg.ahandle, pMsg, mError("msg:%p, app:%p type:%s not processed, no write auth", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType]); taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MND_NO_RIGHTS; return TSDB_CODE_MND_NO_RIGHTS;
} }
......
...@@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG ...@@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG echo "mDebugFlag 143" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 143" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 143" >> $TAOS_CFG
echo "vDebugFlag 135" >> $TAOS_CFG echo "vDebugFlag 143" >> $TAOS_CFG
echo "tsdbDebugFlag 135" >> $TAOS_CFG echo "tsdbDebugFlag 143" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 143" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG echo "jnidebugFlag 143" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG echo "odbcdebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG echo "httpDebugFlag 143" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG echo "monitorDebugFlag 143" >> $TAOS_CFG
echo "mqttDebugFlag 135" >> $TAOS_CFG echo "mqttDebugFlag 143" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG echo "qdebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG echo "udebugFlag 143" >> $TAOS_CFG
echo "sdebugFlag 135" >> $TAOS_CFG echo "sdebugFlag 143" >> $TAOS_CFG
echo "wdebugFlag 135" >> $TAOS_CFG echo "wdebugFlag 143" >> $TAOS_CFG
echo "cqdebugFlag 135" >> $TAOS_CFG echo "cqdebugFlag 143" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册