未验证 提交 f640fe4a 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1714 from taosdata/feature/query

Feature/query
...@@ -196,14 +196,14 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex); ...@@ -196,14 +196,14 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex); SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo); int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo);
void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache); void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols); SVgroupsInfo* vgroupList, SArray* pTagCols);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo); STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscFreeSubqueryInfo(SSqlCmd* pCmd); void tscFreeQueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd); void tscClearSubqueryInfo(SSqlCmd* pCmd);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
......
...@@ -298,6 +298,7 @@ typedef struct STscObj { ...@@ -298,6 +298,7 @@ typedef struct STscObj {
char sversion[TSDB_VERSION_LEN]; char sversion[TSDB_VERSION_LEN];
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;
void* pMgmtConn;
struct SSqlObj * pSql; struct SSqlObj * pSql;
struct SSqlObj * pHb; struct SSqlObj * pHb;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
...@@ -359,7 +360,7 @@ typedef struct SSqlStream { ...@@ -359,7 +360,7 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next; struct SSqlStream *prev, *next;
} SSqlStream; } SSqlStream;
int32_t tscInitRpc(const char *user, const char *secret); int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn);
void tscInitMsgsFp(); void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
...@@ -427,9 +428,7 @@ int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); ...@@ -427,9 +428,7 @@ int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column); char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column);
extern void * pVnodeConn; extern void * pVnodeConn;
extern void * pTscMgmtConn;
extern void * tscCacheHandle; extern void * tscCacheHandle;
extern int slaveIndex;
extern void * tscTmr; extern void * tscTmr;
extern void * tscQhandle; extern void * tscQhandle;
extern int tscKeepConn[]; extern int tscKeepConn[];
......
...@@ -46,7 +46,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -46,7 +46,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = param; pSql->param = param;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = 1; pSql->maxRetry = TSDB_REPLICA_MAX_NUM;
pSql->fp = fp; pSql->fp = fp;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
......
...@@ -733,7 +733,7 @@ int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SS ...@@ -733,7 +733,7 @@ int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SS
*/ */
if (size > 0) { if (size > 0) {
if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) {
tscClearMeterMetaInfo(pTableMetaInfo, false); tscClearTableMetaInfo(pTableMetaInfo, false);
} }
} else { } else {
assert(pTableMetaInfo->pTableMeta == NULL); assert(pTableMetaInfo->pTableMeta == NULL);
...@@ -2477,6 +2477,10 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* ...@@ -2477,6 +2477,10 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
pQueryInfo->groupbyExpr.numOfGroupCols = pList->nExpr; pQueryInfo->groupbyExpr.numOfGroupCols = pList->nExpr;
if (pList->nExpr > TSDB_MAX_TAGS) { if (pList->nExpr > TSDB_MAX_TAGS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1); return invalidSqlErrMsg(pQueryInfo->msg, msg1);
...@@ -4915,7 +4919,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { ...@@ -4915,7 +4919,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
list.num = 1; list.num = 1;
list.ids[0] = colIndex; list.ids[0] = colIndex;
insertResultField(pQueryInfo, size - 1, &list, pSchema->bytes, pSchema->type, pSchema->name, pExpr); insertResultField(pQueryInfo, size, &list, pSchema->bytes, pSchema->type, pSchema->name, pExpr);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, size - 1); SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, size - 1);
pInfo->visible = false; pInfo->visible = false;
} }
......
...@@ -190,6 +190,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -190,6 +190,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
} }
int tscSendMsgToServer(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen); char *pMsg = rpcMallocCont(pCmd->payloadLen);
...@@ -223,7 +224,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -223,7 +224,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql, .handle = pSql,
.code = 0 .code = 0
}; };
rpcSendRequest(pTscMgmtConn, &pSql->ipList, &rpcMsg); rpcSendRequest(pObj->pMgmtConn, &pSql->ipList, &rpcMsg);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2599,7 +2600,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2599,7 +2600,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
// if (pSql->fp != NULL && pSql->pStream == NULL) { // if (pSql->fp != NULL && pSql->pStream == NULL) {
// pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); // pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
// tscFreeSubqueryInfo(pCmd); // tscFreeQueryInfo(pCmd);
// } // }
tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew); tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew);
......
...@@ -66,7 +66,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -66,7 +66,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return NULL; return NULL;
} }
if (tscInitRpc(user, pass) != 0) { void* pMgmtConn = NULL;
if (tscInitRpc(user, pass, &pMgmtConn) != 0) {
terrno = TSDB_CODE_NETWORK_UNAVAIL; terrno = TSDB_CODE_NETWORK_UNAVAIL;
return NULL; return NULL;
} }
...@@ -118,6 +119,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -118,6 +119,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
strtolower(pObj->db, tmp); strtolower(pObj->db, tmp);
} }
pObj->pMgmtConn = pMgmtConn;
pthread_mutex_init(&pObj->mutex, NULL); pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
......
...@@ -147,7 +147,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -147,7 +147,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
retryDelay); retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
tscClearMeterMetaInfo(pTableMetaInfo, true); tscClearTableMetaInfo(pTableMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay); tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return; return;
...@@ -177,7 +177,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -177,7 +177,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if (pSql == NULL || numOfRows < 0) { if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
tscClearMeterMetaInfo(pTableMetaInfo, true); tscClearTableMetaInfo(pTableMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
return; return;
...@@ -259,7 +259,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -259,7 +259,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pStream->numOfRes); pStream->numOfRes);
// release the metric/meter meta information reference, so data in cache can be updated // release the metric/meter meta information reference, so data in cache can be updated
tscClearMeterMetaInfo(pTableMetaInfo, false); tscClearTableMetaInfo(pTableMetaInfo, false);
tscSetNextLaunchTimer(pStream, pSql); tscSetNextLaunchTimer(pStream, pSql);
} }
} }
......
...@@ -33,9 +33,6 @@ ...@@ -33,9 +33,6 @@
// global, not configurable // global, not configurable
void * pVnodeConn; void * pVnodeConn;
void * pVMeterConn;
void * pTscMgmtConn;
void * pSlaveConn;
void * tscCacheHandle; void * tscCacheHandle;
int slaveIndex; int slaveIndex;
void * tscTmr; void * tscTmr;
...@@ -54,7 +51,7 @@ void tscCheckDiskUsage(void *para, void *unused) { ...@@ -54,7 +51,7 @@ void tscCheckDiskUsage(void *para, void *unused) {
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
int32_t tscInitRpc(const char *user, const char *secret) { int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) {
SRpcInit rpcInit; SRpcInit rpcInit;
char secretEncrypt[32] = {0}; char secretEncrypt[32] = {0};
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt); taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
...@@ -80,7 +77,7 @@ int32_t tscInitRpc(const char *user, const char *secret) { ...@@ -80,7 +77,7 @@ int32_t tscInitRpc(const char *user, const char *secret) {
} }
} }
if (pTscMgmtConn == NULL) { if (*pMgmtConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp; rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0; rpcInit.localPort = 0;
...@@ -96,8 +93,8 @@ int32_t tscInitRpc(const char *user, const char *secret) { ...@@ -96,8 +93,8 @@ int32_t tscInitRpc(const char *user, const char *secret) {
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.secret = secretEncrypt; rpcInit.secret = secretEncrypt;
pTscMgmtConn = rpcOpen(&rpcInit); *pMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) { if (*pMgmtConn == NULL) {
tscError("failed to init connection to mgmt"); tscError("failed to init connection to mgmt");
return -1; return -1;
} }
...@@ -211,11 +208,6 @@ void taos_cleanup() { ...@@ -211,11 +208,6 @@ void taos_cleanup() {
pVnodeConn = NULL; pVnodeConn = NULL;
} }
if (pTscMgmtConn != NULL) {
rpcClose(pTscMgmtConn);
pTscMgmtConn = NULL;
}
taosTmrCleanUp(tscTmr); taosTmrCleanUp(tscTmr);
} }
......
...@@ -337,7 +337,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) { ...@@ -337,7 +337,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) {
pCmd->pTableList= NULL; pCmd->pTableList= NULL;
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeSubqueryInfo(pCmd); tscFreeQueryInfo(pCmd);
} }
/* /*
...@@ -761,6 +761,8 @@ void tscCloseTscObj(STscObj* pObj) { ...@@ -761,6 +761,8 @@ void tscCloseTscObj(STscObj* pObj) {
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
sem_destroy(&pSql->rspSem); sem_destroy(&pSql->rspSem);
rpcClose(pObj->pMgmtConn);
pthread_mutex_destroy(&pObj->mutex); pthread_mutex_destroy(&pObj->mutex);
tscTrace("%p DB connection is closed", pObj); tscTrace("%p DB connection is closed", pObj);
...@@ -1459,7 +1461,7 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) { ...@@ -1459,7 +1461,7 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
void tscCleanSqlCmd(SSqlCmd* pCmd) { void tscCleanSqlCmd(SSqlCmd* pCmd) {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeSubqueryInfo(pCmd); tscFreeQueryInfo(pCmd);
uint32_t allocSize = pCmd->allocSize; uint32_t allocSize = pCmd->allocSize;
char* allocPtr = pCmd->payload; char* allocPtr = pCmd->payload;
...@@ -1601,7 +1603,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { ...@@ -1601,7 +1603,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) { static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
tscTagCondRelease(&pQueryInfo->tagCond); tscTagCondRelease(&pQueryInfo->tagCond);
tscFieldInfoClear(&pQueryInfo->fieldsInfo); tscFieldInfoClear(&pQueryInfo->fieldsInfo);
...@@ -1611,6 +1613,11 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) { ...@@ -1611,6 +1613,11 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
tscColumnListDestroy(pQueryInfo->colList); tscColumnListDestroy(pQueryInfo->colList);
memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList)); memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList));
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
pQueryInfo->groupbyExpr.columnInfo = NULL;
}
pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf); pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
tfree(pQueryInfo->defaultVal); tfree(pQueryInfo->defaultVal);
...@@ -1619,11 +1626,11 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) { ...@@ -1619,11 +1626,11 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
void tscClearSubqueryInfo(SSqlCmd* pCmd) { void tscClearSubqueryInfo(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->numOfClause; ++i) { for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
doClearSubqueryInfo(pQueryInfo); freeQueryInfoImpl(pQueryInfo);
} }
} }
void tscFreeSubqueryInfo(SSqlCmd* pCmd) { void tscFreeQueryInfo(SSqlCmd* pCmd) {
if (pCmd == NULL || pCmd->numOfClause == 0) { if (pCmd == NULL || pCmd->numOfClause == 0) {
return; return;
} }
...@@ -1632,7 +1639,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) { ...@@ -1632,7 +1639,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) {
char* addr = (char*)pCmd - offsetof(SSqlObj, cmd); char* addr = (char*)pCmd - offsetof(SSqlObj, cmd);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
doClearSubqueryInfo(pQueryInfo); freeQueryInfoImpl(pQueryInfo);
tscClearAllTableMetaInfo(pQueryInfo, (const char*)addr, false); tscClearAllTableMetaInfo(pQueryInfo, (const char*)addr, false);
tfree(pQueryInfo); tfree(pQueryInfo);
} }
...@@ -1691,7 +1698,7 @@ void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro ...@@ -1691,7 +1698,7 @@ void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index);
tscClearMeterMetaInfo(pTableMetaInfo, removeFromCache); tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
free(pTableMetaInfo); free(pTableMetaInfo);
int32_t after = pQueryInfo->numOfTables - index - 1; int32_t after = pQueryInfo->numOfTables - index - 1;
...@@ -1713,13 +1720,18 @@ void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool ...@@ -1713,13 +1720,18 @@ void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool
tfree(pQueryInfo->pTableMetaInfo); tfree(pQueryInfo->pTableMetaInfo);
} }
void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) { void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) {
if (pTableMetaInfo == NULL) { if (pTableMetaInfo == NULL) {
return; return;
} }
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
tfree(pTableMetaInfo->vgroupList); tfree(pTableMetaInfo->vgroupList);
if (pTableMetaInfo->tagColList != NULL) {
taosArrayDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL;
}
} }
void tscResetForNextRetrieve(SSqlRes* pRes) { void tscResetForNextRetrieve(SSqlRes* pRes) {
......
...@@ -990,9 +990,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat ...@@ -990,9 +990,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int16_t *type, int16_t *bytes) { static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock) {
char *groupbyColumnData = NULL;
SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr; SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr;
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
...@@ -1015,12 +1013,22 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1 ...@@ -1015,12 +1013,22 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1
*type = pQuery->colList[colIndex].type; *type = pQuery->colList[colIndex].type;
*bytes = pQuery->colList[colIndex].bytes; *bytes = pQuery->colList[colIndex].bytes;
/*
// groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].inf); * the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare
break; * stage, the remain meter may not have the required column in cache actually. So, the validation of required
* column in cache with the corresponding meter schema is reinforced.
*/
int32_t numOfCols = taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
if (pColIndex->colId == p->info.colId) {
return p->pData;
}
}
} }
return groupbyColumnData; return NULL;
} }
static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
...@@ -1091,8 +1099,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1091,8 +1099,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
char *groupbyColumnData = NULL; char *groupbyColumnData = NULL;
if (groupbyStateValue) { if (groupbyStateValue) {
assert(0); groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock);
// groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes);
} }
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
...@@ -6088,9 +6095,16 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) ...@@ -6088,9 +6095,16 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
STableId *id = taosArrayGet(pTableIdList, 0); STableId *id = taosArrayGet(pTableIdList, 0);
id->uid = -1; // todo fix me id->uid = -1; // todo fix me
// group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) {
numOfGroupByCols = 0;
}
// todo handle the error
/*int32_t ret =*/tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex, /*int32_t ret =*/tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex,
pQueryMsg->numOfGroupCols); numOfGroupByCols);
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
goto _query_over; goto _query_over;
......
...@@ -58,7 +58,7 @@ TEST(testCase, patternMatchTest) { ...@@ -58,7 +58,7 @@ TEST(testCase, patternMatchTest) {
EXPECT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH); EXPECT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH);
str = "abcdefgabcdeju"; str = "abcdefgabcdeju";
ret = patternMatch("abc%f_", str, 1, &info); ret = patternMatch("abc%f_", str, 1, &info); // pattern string is longe than the size
EXPECT_EQ(ret, TSDB_PATTERN_NOMATCH); EXPECT_EQ(ret, TSDB_PATTERN_NOMATCH);
str = "abcdefgabcdeju"; str = "abcdefgabcdeju";
...@@ -72,4 +72,8 @@ TEST(testCase, patternMatchTest) { ...@@ -72,4 +72,8 @@ TEST(testCase, patternMatchTest) {
str = "abcdefgabcdeju"; str = "abcdefgabcdeju";
ret = patternMatch("a__", str, 2, &info); ret = patternMatch("a__", str, 2, &info);
EXPECT_EQ(ret, TSDB_PATTERN_NOMATCH); EXPECT_EQ(ret, TSDB_PATTERN_NOMATCH);
str = "carzero";
ret = patternMatch("%o", str, strlen(str), &info);
EXPECT_EQ(ret, TSDB_PATTERN_MATCH);
} }
...@@ -75,7 +75,7 @@ int main(int argc, char *argv[]) { ...@@ -75,7 +75,7 @@ int main(int argc, char *argv[]) {
doQuery(taos, "create database if not exists test"); doQuery(taos, "create database if not exists test");
doQuery(taos, "use test"); doQuery(taos, "use test");
doQuery(taos, "insert into tm99 values('2020-01-01 1:1:1', 99);"); doQuery(taos, "select count(*),k,sum(k) from m1 group by k");
// doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);"); // doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);");
...@@ -86,7 +86,7 @@ int main(int argc, char *argv[]) { ...@@ -86,7 +86,7 @@ int main(int argc, char *argv[]) {
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);");
doQuery(taos, "select sum(k),count(*) from m1 group by a"); // doQuery(taos, "select sum(k),count(*) from m1 group by a");
taos_close(taos); taos_close(taos);
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册