未验证 提交 e2706d79 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1530 from taosdata/feature/query

Feature/query
...@@ -1512,6 +1512,9 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -1512,6 +1512,9 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
tfree(pState);
tfree(pSupporter);
// release data block data // release data block data
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
......
...@@ -587,7 +587,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -587,7 +587,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache // set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) { if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
strcpy(pTableMetaInfo->name, pDataBlock->tableId); strcpy(pTableMetaInfo->name, pDataBlock->tableId);
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
...@@ -599,7 +599,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -599,7 +599,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
/* /*
* the submit message consists of : [RPC header|message body|digest] * the submit message consists of : [RPC header|message body|digest]
* the dataBlock only includes the RPC Header buffer and actual submit messsage body, space for digest needs * the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs
* additional space. * additional space.
*/ */
int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100); int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100);
...@@ -1277,7 +1277,7 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) { ...@@ -1277,7 +1277,7 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) {
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) { void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) {
if (src == NULL) { if (src == NULL || src->numOfExprs == 0) {
return; return;
} }
...@@ -1983,22 +1983,24 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1983,22 +1983,24 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
return NULL; return NULL;
} }
memcpy(&pNew->cmd, pCmd, sizeof(SSqlCmd)); SSqlCmd* pnCmd = &pNew->cmd;
memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
pNew->cmd.command = cmd;
pNew->cmd.payload = NULL; pnCmd->command = cmd;
pNew->cmd.allocSize = 0; pnCmd->payload = NULL;
pnCmd->allocSize = 0;
pNew->cmd.pQueryInfo = NULL; pnCmd->pQueryInfo = NULL;
pNew->cmd.numOfClause = 0; pnCmd->numOfClause = 0;
pNew->cmd.clauseIndex = 0; pnCmd->clauseIndex = 0;
pnCmd->pDataBlocks = NULL;
if (tscAddSubqueryInfo(&pNew->cmd) != TSDB_CODE_SUCCESS) { if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
return NULL; return NULL;
} }
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo)); memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo));
...@@ -2018,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2018,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
} }
if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex); tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex);
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
return NULL; return NULL;
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include "lz4.h" #include "lz4.h"
#include "taoserror.h" #include "taoserror.h"
#include "tsocket.h" #include "tsocket.h"
#include "shash.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "rpcUdp.h" #include "rpcUdp.h"
#include "rpcCache.h" #include "rpcCache.h"
...@@ -263,7 +262,6 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -263,7 +262,6 @@ void *rpcOpen(SRpcInit *pInit) {
} }
if (pRpc->connType == TAOS_CONN_SERVER) { if (pRpc->connType == TAOS_CONN_SERVER) {
// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString);
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
if (pRpc->hash == NULL) { if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label); tError("%s failed to init string hash", pRpc->label);
...@@ -298,7 +296,6 @@ void rpcClose(void *param) { ...@@ -298,7 +296,6 @@ void rpcClose(void *param) {
} }
} }
// taosCleanUpStrHash(pRpc->hash);
taosHashCleanup(pRpc->hash); taosHashCleanup(pRpc->hash);
taosTmrCleanUp(pRpc->tmrCtrl); taosTmrCleanUp(pRpc->tmrCtrl);
taosIdPoolCleanUp(pRpc->idPool); taosIdPoolCleanUp(pRpc->idPool);
...@@ -535,8 +532,7 @@ static void rpcCloseConn(void *thandle) { ...@@ -535,8 +532,7 @@ static void rpcCloseConn(void *thandle) {
if ( pRpc->connType == TAOS_CONN_SERVER) { if ( pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0}; char hashstr[40] = {0};
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
// taosDeleteStrHash(pRpc->hash, hashstr); taosHashRemove(pRpc->hash, hashstr, size);
// taosHashRemove(pRpc->hash, hashstr, size);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
...@@ -588,7 +584,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -588,7 +584,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
// check if it is already allocated // check if it is already allocated
// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
if (ppConn) pConn = *ppConn; if (ppConn) pConn = *ppConn;
if (pConn) return pConn; if (pConn) return pConn;
...@@ -621,7 +616,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -621,7 +616,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->localPort = (pRpc->localPort + pRpc->index); pConn->localPort = (pRpc->localPort + pRpc->index);
} }
// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册