提交 44620e05 编写于 作者: H hjxilinx

[td-98] support iplist

上级 fa5e2698
...@@ -351,7 +351,7 @@ typedef struct SSqlObj { ...@@ -351,7 +351,7 @@ typedef struct SSqlObj {
char * sqlstr; char * sqlstr;
char retry; char retry;
char maxRetry; char maxRetry;
SRpcIpSet *ipList; SRpcIpSet ipList;
char freed : 4; char freed : 4;
char listed : 4; char listed : 4;
tsem_t rspSem; tsem_t rspSem;
......
...@@ -209,7 +209,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -209,7 +209,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
// assert(pQueryInfo->numOfTables == 0);
STableMetaInfo* pTableMetaInfo = NULL; STableMetaInfo* pTableMetaInfo = NULL;
if (pQueryInfo->numOfTables == 0) { if (pQueryInfo->numOfTables == 0) {
......
...@@ -169,17 +169,27 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -169,17 +169,27 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
} }
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
char *pMsg = rpcMallocCont(pSql->cmd.payloadLen); SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen);
if (NULL == pMsg) { if (NULL == pMsg) {
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
pSql->ipList->ip[0] = inet_addr(tsPrivateIp);
if (pSql->cmd.command < TSDB_SQL_MGMT) { if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->ipList->port = tsDnodeShellPort; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pSql->ipList.numOfIps = pTableMeta->numOfVpeers;
pSql->ipList.port = tsDnodeShellPort;
pSql->ipList.inUse = 0;
for(int32_t i = 0; i < pTableMeta->numOfVpeers; ++i) {
pSql->ipList.ip[i] = pTableMeta->vpeerDesc[i].ip;
}
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -189,10 +199,12 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -189,10 +199,12 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql, .handle = pSql,
.code = 0 .code = 0
}; };
rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg); rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
} else { } else {
pSql->ipList->port = tsMnodeShellPort; pSql->ipList = tscMgmtIpList;
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); pSql->ipList.port = tsMnodeShellPort;
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType, .msgType = pSql->cmd.msgType,
...@@ -201,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -201,7 +213,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql, .handle = pSql,
.code = 0 .code = 0
}; };
rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg); rpcSendRequest(pTscMgmtConn, &pSql->ipList, &rpcMsg);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -405,7 +417,7 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -405,7 +417,7 @@ int tscProcessSql(SSqlObj *pSql) {
} }
// temp // temp
pSql->ipList = &tscMgmtIpList; // pSql->ipList = tscMgmtIpList;
// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
// pSql->index = pTableMetaInfo->pTableMeta->index; // pSql->index = pTableMetaInfo->pTableMeta->index;
// } else { // it must be the parent SSqlObj for super table query // } else { // it must be the parent SSqlObj for super table query
...@@ -417,7 +429,7 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -417,7 +429,7 @@ int tscProcessSql(SSqlObj *pSql) {
// } // }
// } // }
} else if (pSql->cmd.command < TSDB_SQL_LOCAL) { } else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
pSql->ipList = &tscMgmtIpList; pSql->ipList = tscMgmtIpList;
} else { // local handler } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
...@@ -532,9 +544,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -532,9 +544,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during copying data into paylaod // pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htons(pMsgDesc->numOfVnodes)); tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htonl(pMsgDesc->numOfVnodes));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1836,6 +1848,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1836,6 +1848,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId); pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId);
pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip);
pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId);
} }
SSchema* pSchema = pMetaMsg->schema; SSchema* pSchema = pMetaMsg->schema;
......
...@@ -860,12 +860,10 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { ...@@ -860,12 +860,10 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
pCmd->allocSize = size; pCmd->allocSize = size;
} }
memset(pCmd->payload, 0, pCmd->payloadLen); memset(pCmd->payload, 0, pCmd->allocSize);
} }
//memset(pCmd->payload, 0, pCmd->allocSize);
assert(pCmd->allocSize >= size); assert(pCmd->allocSize >= size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -1508,9 +1508,9 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { ...@@ -1508,9 +1508,9 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) { if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].publicIp);
} else { } else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; pMeta->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].privateIp);
} }
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId); pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId);
......
...@@ -2480,12 +2480,11 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl ...@@ -2480,12 +2480,11 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
} }
if (r == BLK_DATA_NO_NEEDED) { if (r == BLK_DATA_NO_NEEDED) {
// qTrace("QInfo:%p vid:%d sid:%d id:%s, slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", qTrace("QInfo:%p slot:%d, data block ignored, brange:%" PRId64 "-%" PRId64 ", rows:%d",
// rows:%d", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->size);
// pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints);
} else if (r == BLK_DATA_FILEDS_NEEDED) { } else if (r == BLK_DATA_FILEDS_NEEDED) {
if (tsdbRetrieveDataBlockStatisInfo(pRuntimeEnv->pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { if (tsdbRetrieveDataBlockStatisInfo(pRuntimeEnv->pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
// return DISK_DATA_LOAD_FAILED; // return DISK_DATA_LOAD_FAILED;
} }
if (*pStatis == NULL) { if (*pStatis == NULL) {
...@@ -5908,14 +5907,6 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5908,14 +5907,6 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->sdata[col]); tfree(pQuery->sdata[col]);
} }
// for (int col = 0; col < pQuery->numOfCols; ++col) {
// vnodeFreeColumnInfo(&pQuery->colList[col].data);
// }
//
// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// tfree(pQuery->tsData);
// }
sem_destroy(&(pQInfo->dataReady)); sem_destroy(&(pQInfo->dataReady));
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
...@@ -6125,9 +6116,6 @@ void qTableQuery(SQInfo *pQInfo) { ...@@ -6125,9 +6116,6 @@ void qTableQuery(SQInfo *pQInfo) {
dTrace("QInfo:%p query task is launched", pQInfo); dTrace("QInfo:%p query task is launched", pQInfo);
// sem_post(&pQInfo->dataReady);
// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER;
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
if (numOfTables == 1) { if (numOfTables == 1) {
singleTableQueryImpl(pQInfo); singleTableQueryImpl(pQInfo);
......
...@@ -37,10 +37,6 @@ typedef struct SField { ...@@ -37,10 +37,6 @@ typedef struct SField {
// todo need the definition // todo need the definition
} SField; } SField;
typedef struct SHeaderFileInfo {
int32_t fileId;
} SHeaderFileInfo;
typedef struct SQueryFilePos { typedef struct SQueryFilePos {
int32_t fid; int32_t fid;
int32_t slot; int32_t slot;
...@@ -380,11 +376,12 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { ...@@ -380,11 +376,12 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
SArray *sa = getDefaultLoadColumns(pQueryHandle, true); SArray *sa = getDefaultLoadColumns(pQueryHandle, true);
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) { if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
// query ended in current block // query ended in current block
if (pQueryHandle->window.ekey < pBlock->keyLast) { if (pQueryHandle->window.ekey < pBlock->keyLast) {
doLoadDataFromFileBlock(pQueryHandle); doLoadDataFromFileBlock(pQueryHandle);
filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa); filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa);
} else { // the whole block is loaded in to buffer
pQueryHandle->realNumOfRows = pBlock->numOfPoints;
} }
} else {// todo desc query } else {// todo desc query
if (pQueryHandle->window.ekey > pBlock->keyFirst) { if (pQueryHandle->window.ekey > pBlock->keyFirst) {
...@@ -932,10 +929,6 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList ...@@ -932,10 +929,6 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
if (pHandle->cur.fid < 0) { if (pHandle->cur.fid < 0) {
return pHandle->pColumns; return pHandle->pColumns;
} else { } else {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册