diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index dad32e58da1dcea4b6850f2aaeca50e538e1c14d..80c3e77eff55585bb240a4f6a817b6fda0387ca1 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -351,7 +351,7 @@ typedef struct SSqlObj { char * sqlstr; char retry; char maxRetry; - SRpcIpSet *ipList; + SRpcIpSet ipList; char freed : 4; char listed : 4; tsem_t rspSem; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 40e70c247ef2d8d4a1cce6db07ac9f48271a5085..4462af7f950b069d774ff4222310fae367d9f592 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -209,7 +209,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); -// assert(pQueryInfo->numOfTables == 0); STableMetaInfo* pTableMetaInfo = NULL; if (pQueryInfo->numOfTables == 0) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5ea6941fffcc3403d89b21ed2bb97a6747edc8d9..3042e3e59be30be1866685a1aa299d06b4b584ef 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -169,17 +169,27 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { } int tscSendMsgToServer(SSqlObj *pSql) { - char *pMsg = rpcMallocCont(pSql->cmd.payloadLen); + SSqlCmd* pCmd = &pSql->cmd; + + char *pMsg = rpcMallocCont(pCmd->payloadLen); if (NULL == pMsg) { tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - pSql->ipList->ip[0] = inet_addr(tsPrivateIp); - if (pSql->cmd.command < TSDB_SQL_MGMT) { - pSql->ipList->port = tsDnodeShellPort; - tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; + + pSql->ipList.numOfIps = pTableMeta->numOfVpeers; + pSql->ipList.port = tsDnodeShellPort; + pSql->ipList.inUse = 0; + + for(int32_t i = 0; i < pTableMeta->numOfVpeers; ++i) { + pSql->ipList.ip[i] = pTableMeta->vpeerDesc[i].ip; + } + + tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { @@ -189,10 +199,12 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = pSql, .code = 0 }; - rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg); + rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg); } else { - pSql->ipList->port = tsMnodeShellPort; - tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); + pSql->ipList = tscMgmtIpList; + pSql->ipList.port = tsMnodeShellPort; + + tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { .msgType = pSql->cmd.msgType, @@ -201,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = pSql, .code = 0 }; - rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg); + rpcSendRequest(pTscMgmtConn, &pSql->ipList, &rpcMsg); } return TSDB_CODE_SUCCESS; @@ -254,11 +266,15 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { rpcFreeCont(rpcMsg->pCont); return; } else { - tscTrace("%p it shall renew meter meta, code:%d", pSql, tstrerror(rpcMsg->code)); + tscTrace("%p it shall renew table meta, code:%d", pSql, tstrerror(rpcMsg->code)); pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; pSql->res.code = rpcMsg->code; // keep the previous error code - + if (++pSql->retry > pSql->maxRetry) { + tscError("%p max retry %d reached, ", pSql, pSql->retry); + return; + } + rpcMsg->code = tscRenewMeterMeta(pSql, pTableMetaInfo->name); if (pTableMetaInfo->pTableMeta) { @@ -405,7 +421,7 @@ int tscProcessSql(SSqlObj *pSql) { } // temp - pSql->ipList = &tscMgmtIpList; +// pSql->ipList = tscMgmtIpList; // if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // pSql->index = pTableMetaInfo->pTableMeta->index; // } else { // it must be the parent SSqlObj for super table query @@ -417,7 +433,7 @@ int tscProcessSql(SSqlObj *pSql) { // } // } } else if (pSql->cmd.command < TSDB_SQL_LOCAL) { - pSql->ipList = &tscMgmtIpList; + pSql->ipList = tscMgmtIpList; } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -532,9 +548,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted - // pSql->cmd.payloadLen is set during copying data into paylaod + // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htons(pMsgDesc->numOfVnodes)); + tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htonl(pMsgDesc->numOfVnodes)); return TSDB_CODE_SUCCESS; } @@ -1837,6 +1853,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId); + pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip); + pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId); } SSchema* pSchema = pMetaMsg->schema; @@ -2436,7 +2454,7 @@ static void tscWaitingForCreateTable(SSqlCmd *pCmd) { int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) { int code = 0; - // handle metric meta renew process + // handle table meta renew process SSqlCmd *pCmd = &pSql->cmd; SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 61b7ab4ec589ed2c9d9bad992d1ce6d10c60cbe8..dd0945ab0494c99ff9a01af97614291ea8084a7c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -860,12 +860,10 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { pCmd->allocSize = size; } - memset(pCmd->payload, 0, pCmd->payloadLen); + memset(pCmd->payload, 0, pCmd->allocSize); } - //memset(pCmd->payload, 0, pCmd->allocSize); assert(pCmd->allocSize >= size); - return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index d6d7a6afc0cc649cee27ad45ef3483f816af7a61..24218998b57f228923b329fa9386deece17aad83 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -1508,9 +1508,9 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { if (usePublicIp) { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; + pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].publicIp); } else { - pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; + pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].privateIp); } pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId); diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index f0aa13ee3d2de60c5eb15126d3c6fff8e1aa3c73..05e1c2f66fd30c8a3821b2e1898410e6a94eb619 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2480,12 +2480,11 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl } if (r == BLK_DATA_NO_NEEDED) { - // qTrace("QInfo:%p vid:%d sid:%d id:%s, slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", - // rows:%d", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, - // pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints); + qTrace("QInfo:%p slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d", + GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->size); } else if (r == BLK_DATA_FILEDS_NEEDED) { if (tsdbRetrieveDataBlockStatisInfo(pRuntimeEnv->pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { -// return DISK_DATA_LOAD_FAILED; + // return DISK_DATA_LOAD_FAILED; } if (*pStatis == NULL) { @@ -5909,14 +5908,6 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->sdata[col]); } - // for (int col = 0; col < pQuery->numOfCols; ++col) { - // vnodeFreeColumnInfo(&pQuery->colList[col].data); - // } - // - // if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - // tfree(pQuery->tsData); - // } - sem_destroy(&(pQInfo->dataReady)); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); @@ -6126,9 +6117,6 @@ void qTableQuery(SQInfo *pQInfo) { dTrace("QInfo:%p query task is launched", pQInfo); -// sem_post(&pQInfo->dataReady); -// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER; - int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); if (numOfTables == 1) { singleTableQueryImpl(pQInfo); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 52d05dc626f59804cfa8926164d7143e2d6bd6a8..31426fc53c960d8b70fe9e64d1f5c9d4a3c56b5e 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1097,6 +1097,10 @@ static void rpcProcessConnError(void *param, void *id) { SRpcInfo *pRpc = pContext->pRpc; SRpcMsg rpcMsg; + if (pRpc == NULL) { + return; + } + tTrace("%s connection error happens", pRpc->label); if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) { diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 49cbd1004256892c2b66fc784629e5bf9f7550e5..a44174ae6613eccee7accaa9b14437c8b24e310a 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -37,10 +37,6 @@ typedef struct SField { // todo need the definition } SField; -typedef struct SHeaderFileInfo { - int32_t fileId; -} SHeaderFileInfo; - typedef struct SQueryFilePos { int32_t fid; int32_t slot; @@ -380,11 +376,12 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { SArray *sa = getDefaultLoadColumns(pQueryHandle, true); if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { - // query ended in current block if (pQueryHandle->window.ekey < pBlock->keyLast) { doLoadDataFromFileBlock(pQueryHandle); filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa); + } else { // the whole block is loaded in to buffer + pQueryHandle->realNumOfRows = pBlock->numOfPoints; } } else {// todo desc query if (pQueryHandle->window.ekey > pBlock->keyFirst) { @@ -932,10 +929,6 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; if (pHandle->cur.fid < 0) { - - - - return pHandle->pColumns; } else { STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);