From 920edd131db50f73d45086ad5e65fda59fad0a25 Mon Sep 17 00:00:00 2001 From: hjliao Date: Fri, 30 Aug 2019 17:51:47 +0800 Subject: [PATCH] fix bugs #436 --- src/system/src/mgmtShell.c | 68 ++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 17 deletions(-) diff --git a/src/system/src/mgmtShell.c b/src/system/src/mgmtShell.c index 98bd1b9c76..6a3b59288d 100644 --- a/src/system/src/mgmtShell.c +++ b/src/system/src/mgmtShell.c @@ -14,16 +14,15 @@ */ #include -#include -#include #include "taosmsg.h" #include "dnodeSystem.h" #include "mgmt.h" #include "mgmtProfile.h" #include "tlog.h" -#pragma GCC diagnostic ignored "-Wint-conversion" -#pragma GCC diagnostic ignored "-Wpointer-sign" + +#pragma GCC diagnostic ignored "-Wint-conversion" +#pragma GCC diagnostic ignored "-Wpointer-sign" void * pShellConn = NULL; SConnObj *connList; @@ -119,6 +118,27 @@ static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp * return pStart; } +/** + * check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one. + * + * @param pMsg + * @return + */ +bool mgmtCheckMeterMetaMsgType(char *pMsg) { + SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; + + int16_t autoCreate = htons(pInfo->createFlag); + STabObj *pMeterObj = mgmtGetMeter(pInfo->meterId); + + // If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue + bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1); + if (addIntoTranQueue) { + mTrace("meter:%s auto created task added", pInfo->meterId); + } + + return addIntoTranQueue; +} + int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; STabObj * pMeterObj = NULL; @@ -133,7 +153,8 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { int size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN + TSDB_EXTRA_PAYLOAD_SIZE; - if ((pConn->pDb != NULL && pConn->pDb->dropStatus != TSDB_DB_STATUS_READY) || pConn->pDb == NULL) { + if (pConn->pDb == NULL || (pConn->pDb != NULL && pConn->pDb->dropStatus != TSDB_DB_STATUS_READY)) { + // todo handle failed to allocate msg buffer if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { return 0; } @@ -145,14 +166,24 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { } pMeterObj = mgmtGetMeter(pInfo->meterId); + + // on demand create table from super table if meter does not exists if (pMeterObj == NULL && pInfo->createFlag == 1) { - // create the meter objects if not exists SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData)); + if (pCreateMsg == NULL) { + return 0; + } + memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); strcpy(pCreateMsg->meterId, pInfo->meterId); - // todo handle if not master mnode + int32_t code = mgmtCreateMeter(pConn->pDb, pCreateMsg); - mTrace("meter:%s is automatically created by %s, code:%d", pCreateMsg->meterId, pConn->pUser->user, code); + + char stableName[TSDB_METER_ID_LEN] = {0}; + strncpy(stableName, pInfo->tags, TSDB_METER_ID_LEN); + mTrace("meter:%s is automatically created by %s from %s, code:%d", pCreateMsg->meterId, pConn->pUser->user, + stableName, code); + tfree(pCreateMsg); if (code != TSDB_CODE_SUCCESS) { @@ -180,7 +211,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { pRsp->code = TSDB_CODE_DB_NOT_SELECTED; pMsg++; } else { - mTrace("meter:%s, meta is retrieved from:%s", pInfo->meterId, pMeterObj->meterId); + mTrace("%s, uid:%lld meter meta is retrieved", pInfo->meterId, pMeterObj->uid); pRsp->code = 0; pMsg += sizeof(STaosRsp); *pMsg = TSDB_IE_TYPE_META; @@ -199,8 +230,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { pMeta->meterType = htons(pMeterObj->meterType); pMsg += sizeof(SMeterMeta); - pSchema = (SSchema *)pMsg; // schema locates at the end of SMeterMeta - // struct + pSchema = (SSchema *)pMsg; // schema locates at the end of SMeterMeta struct if (mgmtMeterCreateFromMetric(pMeterObj)) { assert(pMeterObj->numOfTags == 0); @@ -208,8 +238,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { STabObj *pMetric = mgmtGetMeter(pMeterObj->pTagData); uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns; - pMeta->numOfTags = htons(pMetric->numOfTags); // update the numOfTags - // info + pMeta->numOfTags = htons(pMetric->numOfTags); // update the numOfTags info mgmtSetSchemaFromMeters(pSchema, pMetric, numOfTotalCols); pMsg += numOfTotalCols * sizeof(SSchema); @@ -233,7 +262,10 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { pRsp->code = TSDB_CODE_INVALID_TABLE; goto _exit_code; } - pMeta->vpeerDesc[0].vnode = htonl(pVgroup->vnodeGid[0].vnode); + for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } } } @@ -934,12 +966,14 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { char *cont = (char *)pMsg->content + sizeof(SMgmtHead); int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); - if (pMsg->msgType == TSDB_MSG_TYPE_METERINFO || pMsg->msgType == TSDB_MSG_TYPE_METRIC_META || - pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE || pMsg->msgType == TSDB_MSG_TYPE_SHOW) { + + // read-only request can be executed concurrently + if ((pMsg->msgType == TSDB_MSG_TYPE_METERINFO && (!mgmtCheckMeterMetaMsgType(cont))) || + pMsg->msgType == TSDB_MSG_TYPE_METRIC_META || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE || + pMsg->msgType == TSDB_MSG_TYPE_SHOW) { (*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn); } else { if (mgmtProcessShellMsg[pMsg->msgType]) { - // TODO : put the msg in tran queue SSchedMsg schedMsg; schedMsg.msg = malloc(pMsg->msgLen); // Message to deal with memcpy(schedMsg.msg, pMsg, pMsg->msgLen); -- GitLab