diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index fc88fc7cf7a2b3d8aa158cd6646872871c89a174..bedaab1b2f909d2903a2977880f049093c391641 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -71,6 +71,7 @@ typedef struct STableMeta { typedef struct STableMetaInfo { STableMeta * pTableMeta; // table meta, cached in client side and acquried by name SVgroupsInfo* vgroupList; + /* * 1. keep the vnode index during the multi-vnode super table projection query * 2. keep the vnode index for multi-vnode insertion diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 9d965b6cd7a4dd80e0ee2f335c66de907e76d909..8eabbcb62a1ac0d160b897163a4009b1d633c277 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -47,7 +47,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->signature = pSql; pSql->param = param; pSql->pTscObj = pObj; - pSql->maxRetry = TSDB_REPLICA_MAX_NUM; + pSql->maxRetry = 1; pSql->fp = fp; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 26987858bd5ea30fd1aa771b8b8a559d12487039..f5835e1fdd74915715220d165c2fc55aa14bd225 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -651,19 +651,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); } else { // query super table + int32_t index = pTableMetaInfo->vgroupIndex; - if (pTableMetaInfo->vgroupIndex < 0) { - tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vgroupIndex); + if (index < 0) { + tscError("%p error vgroupIndex:%d", pSql, index); return -1; } - pSql->ipList.numOfIps = 1; // todo fix me + SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; + + pSql->ipList.numOfIps = pVgroupInfo->numOfIps; // todo fix me pSql->ipList.port = tsDnodeShellPort; pSql->ipList.inUse = 0; - // todo extract method - SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[pTableMetaInfo->vgroupIndex]; - pSql->ipList.ip[0] = pVgroupInfo->ipAddr[0].ip; + for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { + pSql->ipList.ip[i] = pVgroupInfo->ipAddr[i].ip; + } #if 0 SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex); @@ -676,8 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } #endif - tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, - pTableMetaInfo->vgroupIndex); + tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, index); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); numOfTables = 1; @@ -823,7 +825,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg); SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; - if (pGroupbyExpr->numOfGroupCols != 0) { + if (pGroupbyExpr->numOfGroupCols > 0) { pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); pQueryMsg->orderType = htons(pGroupbyExpr->orderType); @@ -2133,10 +2135,11 @@ _error_clean: for(int32_t i = 0; i < pInfo->vgroupList->numOfVgroups; ++i) { SCMVgroupInfo* pVgroups = &pInfo->vgroupList->vgroups[i]; - pVgroups->numOfIps = htonl(pVgroups->numOfIps); + pVgroups->vgId = htonl(pVgroups->vgId); + assert(pVgroups->numOfIps >= 1); - for(int32_t j = 0; j < tListLen(pVgroups->ipAddr); ++j) { + for(int32_t j = 0; j < pVgroups->numOfIps; ++j) { pVgroups->ipAddr[j].ip = htonl(pVgroups->ipAddr[j].ip); pVgroups->ipAddr[j].port = htons(pVgroups->ipAddr[j].port); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 47860e4390c93eff7c620bc8441b6a6ef17d0c6e..515ae90db3c44d302cdb16760275a98d0f272602 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -690,7 +690,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { // current data are exhausted, fetch more data if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && - (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { + (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || + pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_DESCRIBE_TABLE))) { taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); sem_wait(&pSql->rspSem); @@ -773,8 +774,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { if (pRes == NULL || pRes->qhandle == 0) { /* Query rsp is not received from vnode, so the qhandle is NULL */ tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); + STscObj* pTscObj = pSql->pTscObj; - if (tscShouldFreeAsyncSqlObj(pSql)) { + if (pTscObj->pSql != pSql) { tscTrace("%p SqlObj is freed by app", pSql); tscFreeSqlObj(pSql); } else { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 0bc1becacaa15d4a49ac6cb53dace85e044cfa24..cf953325832bd4d079934aab2fcb71a364e09beb 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1207,7 +1207,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO // all subqueries are failed tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code); - pPObj->res.code = -(pState->code); + pPObj->res.code = pState->code; // release allocated resource tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, @@ -1336,12 +1336,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR SSqlRes * pRes = &pSql->res; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); -// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - -// SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); -// SVnodeSidList *vnodeInfo = 0; -// SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; - if (numOfRows > 0) { assert(pRes->numOfRows == numOfRows); int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); @@ -1384,11 +1378,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR pthread_mutex_unlock(&trsupport->queryMutex); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); } + + pthread_mutex_unlock(&trsupport->queryMutex); } else { // all data has been retrieved to client tscAllDataRetrievedFromDnode(trsupport, pSql); } - - pthread_mutex_unlock(&trsupport->queryMutex); } static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { @@ -1479,10 +1473,15 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { tscHandleSubqueryError(param, tres, pState->code); } else { // success, proceed to retrieve data from dnode - tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, + tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql, pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex); - taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); + if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode + tscRetrieveFromDnodeCallBack(param, pSql, 0); + } else { + taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); + } + } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d70e2bb7f6b7b28363e82fd4de8390f4f40b6863..f7146115393f5cdbcfbd7f6844260b893a29ef3a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2128,27 +2128,23 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void if (pPrevSql == NULL) { STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name); - -// SSuperTableMeta* pMetricMeta = NULL; -// if (cmd == TSDB_SQL_SELECT) { -// pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key); -// } pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->numOfTags, pTableMetaInfo->tagColumnIndex); - } else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object. -// STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); - -// STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta); -// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta); + } else { // transfer the ownership of pTableMeta to the newly create sql object. + STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); -// pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags, -// pTableMetaInfo->tagColumnIndex); + STableMeta* pPrevTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta); + SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList; + pPrevInfo->vgroupList = NULL; + + pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->numOfTags, + pTableMetaInfo->tagColumnIndex); } assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1); if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { -// assert(pFinalInfo->pMetricMeta != NULL); + assert(pFinalInfo->vgroupList != NULL); } if (cmd == TSDB_SQL_SELECT) { diff --git a/src/dnode/inc/dnodeMClient.h b/src/dnode/inc/dnodeMClient.h index fdaf54e0e5a21f0ef79d3a4158d0e8b2bfe1322b..6d413ada884cc874a8277888995b96fa3e70aa48 100644 --- a/src/dnode/inc/dnodeMClient.h +++ b/src/dnode/inc/dnodeMClient.h @@ -23,8 +23,6 @@ extern "C" { int32_t dnodeInitMClient(); void dnodeCleanupMClient(); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); -void * dnodeGetMnodeList(); -int32_t dnodeGetDnodeId(); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index abfee2239bc7e8b2a6d9f339928ef51a63bc14fe..8f62e3adc09edf3fac58b43fc03f476630946733 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -22,11 +22,11 @@ #include "trpc.h" #include "tsdb.h" #include "twal.h" +#include "vnode.h" #include "dnodeMClient.h" #include "dnodeMgmt.h" #include "dnodeRead.h" #include "dnodeWrite.h" -#include "vnode.h" static int32_t dnodeOpenVnodes(); static void dnodeCloseVnodes(); diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index c50391613eba3da6fafb9cb16219d63756a440d8..57ef655078d46d42516850692eb8730f22d5e7c6 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -118,7 +118,7 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { dPrint("module status is received, start mgmt module", tsModuleStatus, moduleStatus); tsModule[TSDB_MOD_MGMT].enable = true; dnodeSetModuleStatus(TSDB_MOD_MGMT); - (*tsModule[TSDB_MOD_MGMT].stopFp)(); + (*tsModule[TSDB_MOD_MGMT].startFp)(); } if (tsModule[TSDB_MOD_MGMT].enable && !enableMgmtModule) { diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 6215b5a7ee83d84b12ae34cddb16b0dc3c4746a5..0be18a007e831614d0e5362bccf32a7b529abc8b 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -43,6 +43,8 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code); bool dnodeIsFirstDeploy(); uint32_t dnodeGetMnodeMasteIp(); +void * dnodeGetMnodeList(); +int32_t dnodeGetDnodeId(); #ifdef __cplusplus } diff --git a/src/inc/query.h b/src/inc/query.h new file mode 100644 index 0000000000000000000000000000000000000000..ffeb225223e8276ca1a4d0a57a561d379dee578e --- /dev/null +++ b/src/inc/query.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_QUERY_H +#define TDENGINE_QUERY_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* qinfo_t; + +/** + * create the qinfo object according to QueryTableMsg + * @param pVnode + * @param pQueryTableMsg + * @param qinfo + * @return + */ +int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo); + +/** + * Destroy QInfo object + * + * @param qinfo + * @return + */ +void qDestroyQueryInfo(qinfo_t qinfo); + +/** + * the main query execution function, including query on both table and multitables, + * which are decided according to the tag or table name query conditions + * + * @param qinfo + * @return + */ +void qTableQuery(qinfo_t qinfo); + +/** + * Retrieve the produced results information, if current query is not paused or completed, + * this function will be blocked to wait for the query execution completed or paused, + * in which case enough results have been produced already. + * + * @param qinfo + * @return + */ +int32_t qRetrieveQueryResultInfo(qinfo_t qinfo); + +/** + * + * Retrieve the actual results to fill the response message payload. + * Note that this function must be executed after qRetrieveQueryResultInfo is invoked. + * + * @param qinfo qinfo object + * @param pRsp response message + * @param contLen payload length + * @return + */ +int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen); + +/** + * Decide if more results will be produced or not + * + * @param qinfo + * @return + */ +bool qHasMoreResultsToRetrieve(qinfo_t qinfo); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_QUERY_H diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 79d086153adb6acd5e1bf01d3e047f5c98636314..f292345e68636f1480c4c7c738965787362dbbe4 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -584,6 +584,7 @@ typedef struct { char dnodeName[TSDB_NODE_NAME_LEN + 1]; uint32_t privateIp; uint32_t publicIp; + uint32_t moduleStatus; uint32_t lastReboot; // time stamp for last reboot uint16_t numOfTotalVnodes; // from config file uint16_t openVnodes; diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 7bb63658655bf5eabc986dc44192c4e8206e6769..89acf66f21a2d7fa6d622da07f735307e38413a2 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -93,6 +93,8 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup); int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup); void tsdbClearTableCfg(STableCfg *config); +int32_t tsdbGetTableTagVal(tsdb_repo_t *repo, STableId id, int32_t col, int16_t* type, int16_t* bytes, char** val); + int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg); int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId); int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg); @@ -159,11 +161,16 @@ typedef struct SDataBlockInfo { int32_t sid; } SDataBlockInfo; +typedef struct { + size_t numOfTables; + SArray* pGroupList; +} STableGroupInfo; + typedef struct { } SFields; #define TSDB_TS_GREATER_EQUAL 1 -#define TSDB_TS_LESS_EQUAL 2 +#define TSDB_TS_LESS_EQUAL 2 typedef struct SQueryRowCond { int32_t rel; @@ -178,7 +185,7 @@ typedef void *tsdbpos_t; * @param pTableList table sid list * @return */ -tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t *tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo); +tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, SArray *pColumnInfo); /** * move to next block @@ -280,10 +287,10 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); * @param pTagCond. tag query condition * */ -int32_t tsdbQueryTags(tsdb_repo_t *tsdb, int64_t uid, const char *pTagCond, size_t len, SArray **pGroupList, - SColIndex *pColIndex, int32_t numOfCols); +int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupList, + SColIndex* pColIndex, int32_t numOfCols); -int32_t tsdbGetOneTableGroup(tsdb_repo_t *tsdb, int64_t uid, SArray **pGroupList); +int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pGroupInfo); /** * clean up the query handle diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 14bb6e04e5226118e49ec4e516cd2751ff7f103f..48111d3110c7fc6b2f3e54c062329bdf9cfd70c2 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -31,7 +31,6 @@ int32_t mgmtInitDnodes(); void mgmtCleanupDnodes(); char* mgmtGetDnodeStatusStr(int32_t dnodeStatus); -bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType); void mgmtMonitorDnodeModule(); int32_t mgmtGetDnodesNum(); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 89c4796d85d68a7893830439b4f8c081c2fefd61..3c66ff6c57a61a56c361d1d125cedca152f2e2e0 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -129,7 +129,7 @@ static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) { static int32_t mgmtDnodeActionRestored() { int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); - if (numOfRows <= 0 && strcmp(tsMasterIp, tsPrivateIp) == 0) { + if (numOfRows <= 0 && dnodeIsFirstDeploy()) { uint32_t ip = inet_addr(tsPrivateIp); mgmtCreateDnode(ip); SDnodeObj *pDnode = mgmtGetDnodeByIp(ip); @@ -276,6 +276,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->privateIp = htonl(pStatus->privateIp); pStatus->publicIp = htonl(pStatus->publicIp); + pStatus->moduleStatus = htonl(pStatus->moduleStatus); pStatus->lastReboot = htonl(pStatus->lastReboot); pStatus->numOfCores = htons(pStatus->numOfCores); pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); @@ -311,6 +312,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { pDnode->diskAvailable = pStatus->diskAvailable; pDnode->alternativeRole = pStatus->alternativeRole; pDnode->totalVnodes = pStatus->numOfTotalVnodes; + pDnode->moduleStatus = pStatus->moduleStatus; if (pStatus->dnodeId == 0) { mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); @@ -353,7 +355,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { mgmtGetMnodeList(&pRsp->mnodes); pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); - pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); + pRsp->dnodeState.moduleStatus = htonl((int32_t)pDnode->isMgmt); pRsp->dnodeState.createdTime = htonl(pDnode->createdTime / 1000); pRsp->dnodeState.numOfVnodes = 0; @@ -391,10 +393,6 @@ static int32_t mgmtCreateDnode(uint32_t ip) { pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM; sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1); - if (pDnode->privateIp == inet_addr(tsMasterIp)) { - pDnode->moduleStatus |= (1 << TSDB_MOD_MGMT); - } - SSdbOperDesc oper = { .type = SDB_OPER_GLOBAL, .table = tsDnodeSdb, @@ -620,7 +618,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi return numOfRows; } -bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { +static bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { uint32_t status = pDnode->moduleStatus & (1 << moduleType); return status > 0; } diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index 371a09c03f4b0ca2f00ae6887b988800e9721874..38f18b462ae69427088eea77bc7652ce9d144624 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -20,6 +20,7 @@ #include "treplica.h" #include "tgrant.h" #include "ttimer.h" +#include "dnode.h" #include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtAcct.h" @@ -100,6 +101,10 @@ int32_t mgmtStartSystem() { return -1; } + if (replicaInit() < 0) { + mError("failed to init replica") + } + if (mgmtInitDClient() < 0) { return -1; } @@ -108,10 +113,6 @@ int32_t mgmtStartSystem() { return -1; } - if (replicaInit() < 0) { - mError("failed to init dnode balance") - } - grantReset(TSDB_GRANT_ALL, 0); tsMgmtIsRunning = true; diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index e469dc4f6f7ccbcf4a4ba3f2a878b1e0ffdcff1f..8087ce5ad1513cdc89795fbe745bad28c96c96f6 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -55,6 +55,12 @@ static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) { static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) { SMnodeObj *pMnode = pOper->pObj; + + SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId); + if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST; + pDnode->isMgmt = false; + mgmtReleaseDnode(pDnode); + mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 013c0236d0b8240f20e2cc81e526562baf6d4d1c..4bc18d6a0de369bd417f4e8f9e4abdf7cc439e17 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -69,10 +69,16 @@ static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, s static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash}; static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData}; -uint64_t sdbGetVersion() { return tsSdbObj->version; } int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } +uint64_t sdbGetVersion() { + if (tsSdbObj) + return tsSdbObj->version; + else + return 0; +} + static char *sdbGetActionStr(int32_t action) { switch (action) { case SDB_ACTION_INSERT: @@ -147,10 +153,6 @@ void sdbCleanUp() { } } -SSdbObject *sdbGetObj() { - return tsSdbObj; -} - void sdbIncRef(void *handle, void *pRow) { if (pRow) { SSdbTable *pTable = handle; diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 92037ba7932ef81c2d42cf25950a4ac5662261e2..9098a0c17d4db0c8b955cc5b97c187f1d56262d5 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -18,6 +18,7 @@ #include "trpc.h" #include "ttime.h" #include "tutil.h" +#include "dnode.h" #include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtAcct.h" @@ -93,7 +94,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { } static int32_t mgmtUserActionRestored() { - if (strcmp(tsMasterIp, tsPrivateIp) == 0) { + if (dnodeIsFirstDeploy()) { SAcctObj *pAcct = mgmtGetAcct("root"); mgmtCreateUser(pAcct, "root", "taosdata"); mgmtCreateUser(pAcct, "monitor", tsInternalPass); diff --git a/src/query/inc/qast.h b/src/query/inc/qast.h index 72d6f9bf47475dbf454ed8c4eaae557f5dcb3522..bd5e61c321acd90a14a74053d101859c22cf285b 100644 --- a/src/query/inc/qast.h +++ b/src/query/inc/qast.h @@ -47,7 +47,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *); * */ typedef struct tQueryInfo { - int32_t offset; // offset value in tags int32_t colIndex; // index of column in schema uint8_t optr; // expression operator SSchema sch; // schema of tags diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index fb8a908910fd89149d326110bf600c9c3a6d5b99..244f15e1ddc64a4889cd08466424960814bb44d8 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -18,6 +18,7 @@ #include "os.h" #include "hash.h" +#include "tsdb.h" #include "qinterpolation.h" #include "qresultBuf.h" #include "qsqlparser.h" @@ -89,7 +90,7 @@ typedef struct SColumnFilterElem { } SColumnFilterElem; typedef struct SSingleColumnFilterInfo { - SColumnInfoData info; + SColumnInfoData info; int32_t numOfFilters; SColumnFilterElem* pFilters; void* pData; @@ -108,8 +109,6 @@ typedef struct STableQueryInfo { } STableQueryInfo; typedef struct STableDataInfo { - int32_t numOfBlocks; - int32_t start; // start block index int32_t tableIndex; int32_t groupIdx; // group id in table list STableQueryInfo* pTableQInfo; @@ -171,7 +170,7 @@ typedef struct SQInfo { int32_t pointsInterpo; int32_t code; // error code to returned to client sem_t dataReady; - SArray* pTableList; // table id list + STableGroupInfo groupInfo; // table id list void* tsdb; SQueryRuntimeEnv runtimeEnv; @@ -187,50 +186,7 @@ typedef struct SQInfo { */ int32_t tableIndex; int32_t numOfGroupResultPages; - STableDataInfo* pTableDataInfo; TSKEY* tsList; } SQInfo; -/** - * create the qinfo object before adding the query task to each tsdb query worker - * - * @param pReadMsg - * @param pQInfo - * @return - */ -int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); - -/** - * destroy the query info struct - * @param pQInfo - */ -void qDestroyQueryInfo(SQInfo* pQInfo); - -/** - * query on single table - * @param pReadMsg - */ -void qTableQuery(SQInfo* pQInfo); - -/** - * wait for the query completed, and retrieve final results to client - * @param pQInfo - */ -int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo); - -/** - * - * @param pQInfo - * @param pRsp - * @return - */ -int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen); - -/** - * - * @param pQInfo - * @return - */ -bool qHasMoreResultsToRetrieve(SQInfo* pQInfo); - #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/qast.c b/src/query/src/qast.c index 809a5202f2df03fcb8ef5b30bbf15aaf7983be81..8855caee56c77f7af9fe3d8813ae3b7c8c0b21b0 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -544,7 +544,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty iter = tSkipListCreateIterFromVal(pSkipList, (char*) &pCond->end->v.i64Key, type, TSDB_ORDER_DESC); } - __compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type); + __compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type, 0); if (pCond->start != NULL) { int32_t optr = pCond->start->optr; @@ -569,7 +569,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty if (comp) { ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key); - assert(ret <= 0); + assert(ret >= 0); } if (ret == 0 && optr == TSDB_RELATION_GREATER) { @@ -595,8 +595,8 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty SSkipListNode* pNode = tSkipListIterGet(iter); if (comp) { - ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key); - assert(ret >= 0); + ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->end->v.i64Key); + assert(ret <= 0); } if (ret == 0 && optr == TSDB_RELATION_LESS) { diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index a507bf3184d4e8712c0b03fdd0a483227b422330..04f3a711fdae8c312b5fe4b143d0abcbb98364ae 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -27,7 +27,8 @@ #include "qresultBuf.h" #include "queryExecutor.h" #include "queryUtil.h" -#include "tsdb.h" +#include "query.h" +#include "tsdbMain.h" //todo use TableId instead of STable object #define DEFAULT_INTERN_BUF_SIZE 16384L @@ -92,7 +93,7 @@ enum { TS_JOIN_TAG_NOT_EQUALS = 2, }; -static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end); +static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group); static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); @@ -104,7 +105,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag); static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); -static int32_t setAdditionalInfo(SQInfo *pQInfo, int32_t meterIdx, STableQueryInfo *pTableQueryInfo); +static int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); @@ -2185,7 +2186,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { num = 128; } else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table - size_t s = taosArrayGetSize(pQInfo->pTableList); + size_t s = pQInfo->groupInfo.numOfTables; num = MAX(s, INITIAL_RESULT_ROWS_VALUE); } else { // for super table query, one page for each subset num = 1;//pQInfo->pSidSet->numOfSubSet; @@ -2253,7 +2254,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void // get one queried meter assert(0); - // SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[0]->sid); + // SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid); pRuntimeEnv->pTSBuf = param; pRuntimeEnv->cur.vnodeIndex = -1; @@ -2298,10 +2299,8 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void .colList = pQuery->colList, }; - SArray *sa = taosArrayInit(1, POINTER_BYTES); - // for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) { - // SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[i]->sid); + // SMeterObj *p1 = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid); // taosArrayPush(sa, &p1); // } @@ -2310,7 +2309,7 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void taosArrayPush(cols, &pQuery->colList[i]); } - pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, sa, cols); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo, cols); // metric query do not invoke interpolation, it will be done at the second-stage merge if (!isPointInterpoQuery(pQuery)) { @@ -2331,18 +2330,18 @@ int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void */ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { if (pQInfo != NULL) { - // assert(taosHashGetSize(pQInfo->pTableList) >= 1); + // assert(taosHashGetSize(pQInfo->groupInfo) >= 1); } #if 0 - if (pQInfo == NULL || pQInfo->numOfTables == 1) { + if (pQInfo == NULL || pQInfo->groupInfo.numOfTables == 1) { atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1); dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode, pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries); } else { int32_t num = 0; - for (int32_t i = 0; i < pQInfo->numOfTables; ++i) { - SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pTableIdList[i]->sid); + for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { + SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid); atomic_fetch_sub_32(&(pMeter->numOfQueries), 1); if (pMeter->numOfQueries > 0) { @@ -2356,9 +2355,9 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { * in order to reduce log output, for all meters of which numOfQueries count are 0, * we do not output corresponding information */ - num = pQInfo->numOfTables - num; + num = pQInfo->groupInfo.numOfTables - num; dTrace("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo, - pQInfo->numOfTables, num); + pQInfo->groupInfo.numOfTables, num); } #endif } @@ -2663,32 +2662,27 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer */ -static void doSetTagValueInParam(SColumnModel *pTagSchema, int32_t tagColIdx, void *pMeterSidInfo, tVariant *param) { +static void doSetTagValueInParam(void* tsdb, STableId id, int32_t tagColIdx, tVariant *param) { assert(tagColIdx >= 0); -#if 0 - int16_t offset = getColumnModelOffset(pTagSchema, tagColIdx); - - void * pStr = (char *)pMeterSidInfo->tags + offset; - SSchema *pCol = getColumnModelSchema(pTagSchema, tagColIdx); tVariantDestroy(param); - - if (isNull(pStr, pCol->type)) { - param->nType = TSDB_DATA_TYPE_NULL; - } else { - tVariantCreateFromBinary(param, pStr, pCol->bytes, pCol->type); - } -#endif + + char* val = NULL; + int16_t bytes = 0; + int16_t type = 0; + + tsdbGetTableTagVal(tsdb, id, tagColIdx, &type, &bytes, &val); + tVariantCreateFromBinary(param, val, bytes, type); } -void vnodeSetTagValueInParam(STableGroupList *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, void *pMeterSidInfo) { +void vnodeSetTagValueInParam(STableGroupInfo *groupList, SQueryRuntimeEnv *pRuntimeEnv, STableId id, void* tsdb) { SQuery * pQuery = pRuntimeEnv->pQuery; - SColumnModel *pTagSchema = NULL;//pSidSet->pColumnModel; +// SColumnModel *pTagSchema = NULL;//pSidSet->pColumnModel; SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { assert(pFuncMsg->numOfParams == 1); - doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag); + doSetTagValueInParam(tsdb, id, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag); } else { // set tag value, by which the results are aggregated. for (int32_t idx = 0; idx < pQuery->numOfOutputCols; ++idx) { @@ -2699,14 +2693,14 @@ void vnodeSetTagValueInParam(STableGroupList *pSidSet, SQueryRuntimeEnv *pRuntim continue; } - doSetTagValueInParam(pTagSchema, pColEx->colIndex, pMeterSidInfo, &pRuntimeEnv->pCtx[idx].tag); + doSetTagValueInParam(tsdb, id, pColEx->colIndex, &pRuntimeEnv->pCtx[idx].tag); } // set the join tag for first column if (pFuncMsg->functionId == TSDB_FUNC_TS && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && pRuntimeEnv->pTSBuf != NULL) { assert(pFuncMsg->numOfParams == 1); - doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag); +// doSetTagValueInParam(pTagSchema, pFuncMsg->arg->argValue.i64, pMeterSidInfo, &pRuntimeEnv->pCtx[0].tag); } } } @@ -2915,10 +2909,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { int32_t ret = TSDB_CODE_SUCCESS; // while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { - int32_t start = 0;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; - int32_t end = taosArrayGetSize(pQInfo->pTableList) - 1;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; +// int32_t start = 0;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; +// int32_t end = pQInfo->groupInfo.numOfTables - 1;//pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; - ret = mergeIntoGroupResultImpl(pQInfo, pQInfo->pTableDataInfo, start, end); + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + ret = mergeIntoGroupResultImpl(pQInfo, group); if (ret < 0) { // not enough disk space to save the data into disk return -1; } @@ -3015,22 +3010,26 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW return maxOutput; } -int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, STableDataInfo *pTableDataInfo, int32_t start, int32_t end) { +int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* pGroup) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; - tFilePage ** buffer = (tFilePage **)pQuery->sdata; - int32_t * posList = calloc((end - start), sizeof(int32_t)); - STableDataInfo **pTableList = malloc(POINTER_BYTES * (end - start)); + size_t size = taosArrayGetSize(pGroup); + + tFilePage **buffer = (tFilePage **)pQuery->sdata; + int32_t *posList = calloc(size, sizeof(int32_t)); + + STableDataInfo **pTableList = malloc(POINTER_BYTES * size); // todo opt for the case of one table per group int32_t numOfTables = 0; - for (int32_t i = start; i < end; ++i) { - int32_t tid = pTableDataInfo[i].pTableQInfo->tid; - - SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tid); - if (list.size > 0 && pTableDataInfo[i].pTableQInfo->windowResInfo.size > 0) { - pTableList[numOfTables] = &pTableDataInfo[i]; + for (int32_t i = 0; i < size; ++i) { + SPair* p = taosArrayGet(pGroup, i); + STableQueryInfo* pInfo = p->sec; + + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->tid); + if (list.size > 0 && pInfo->windowResInfo.size > 0) { +// pTableList[numOfTables] = &pTableDataInfo[i]; numOfTables += 1; } } @@ -3258,14 +3257,12 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { } if (isIntervalQuery(pQuery)) { - size_t numOfTables = taosArrayGetSize(pQInfo->pTableList); - - for (int32_t i = 0; i < numOfTables; ++i) { - STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; - SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; - - doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); - } +// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { +// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; +// SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; +// +// doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); +// } } else { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); @@ -3557,7 +3554,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } if (pRuntimeEnv->pSecQueryHandle != NULL) { - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, pQInfo->pTableList, cols); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo, cols); } taosArrayDestroy(cols); @@ -3654,7 +3651,7 @@ STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid return pTableQueryInfo; } -void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) { +UNUSED_FUNC void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) { if (pTableQueryInfo == NULL) { return; } @@ -3700,7 +3697,7 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p * @param pRuntimeEnv * @param pDataBlockInfo */ -void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, int32_t meterIdx, int32_t groupIdx, +void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STable* pTable, int32_t groupIdx, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo; @@ -3726,7 +3723,7 @@ void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, int32 initCtxOutputBuf(pRuntimeEnv); pTableQueryInfo->lastKey = nextKey; - setAdditionalInfo(pQInfo, meterIdx, pTableQueryInfo); + setAdditionalInfo(pQInfo, pTable, pTableQueryInfo); } static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { @@ -3754,11 +3751,11 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * } } -int32_t setAdditionalInfo(SQInfo *pQInfo, int32_t meterIdx, STableQueryInfo *pTableQueryInfo) { +int32_t setAdditionalInfo(SQInfo *pQInfo, STable* pTable, STableQueryInfo *pTableQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; assert(pTableQueryInfo->lastKey > 0); - // vnodeSetTagValueInParam(pQInfo->pSidSet, pRuntimeEnv, pQInfo->pMeterSidExtInfo[meterIdx]); + vnodeSetTagValueInParam(&pQInfo->groupInfo, pRuntimeEnv, pTable->tableId, pQInfo->tsdb); // both the master and supplement scan needs to set the correct ts comp start position if (pRuntimeEnv->pTSBuf != NULL) { @@ -3866,7 +3863,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo); } else { - totalSubset = 1;//pQInfo->pSidSet->numOfSubSet; + totalSubset = taosArrayGetSize(pQInfo->groupInfo.pGroupList); } return totalSubset; @@ -4193,7 +4190,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) taosArrayPush(cols, &pQuery->colList[i]); } - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, pQInfo->pTableList, cols); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo, cols); taosArrayDestroy(cols); pQInfo->tsdb = tsdb; @@ -4294,7 +4291,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) return TSDB_CODE_SUCCESS; } -static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STableGroupList *pSidset) { +static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STableGroupInfo *pSidset) { if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) { return false; } @@ -4335,7 +4332,6 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { SQuery * pQuery = pRuntimeEnv->pQuery; int64_t st = taosGetTimestampMs(); - size_t numOfTables = taosArrayGetSize(pQInfo->pTableList); tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { @@ -4345,15 +4341,26 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); STableDataInfo* pTableDataInfo = NULL; + STable* pTable = NULL; - // todo opt performance - for(int32_t i = 0; i < numOfTables; ++i) { - if (pQInfo->pTableDataInfo[i].pTableQInfo->tid == blockInfo.sid) { - pTableDataInfo = &pQInfo->pTableDataInfo[i]; - break; + // todo opt performance using hash table + size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + for(int32_t i = 0; i < numOfGroup; ++i) { + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + + size_t num = taosArrayGetSize(group); + for(int32_t j = 0; j < num; ++j) { + SPair* p = taosArrayGet(group, j); + STableDataInfo* pInfo = p->sec; + + if (pInfo->pTableQInfo->tid == blockInfo.sid) { + pTableDataInfo = p->sec; + pTable = p->first; + break; + } } } - + assert(pTableDataInfo != NULL && pTableDataInfo->pTableQInfo != NULL); STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo; @@ -4364,10 +4371,10 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { TSKEY nextKey = blockInfo.window.ekey; if (!isIntervalQuery(pQuery)) { - setExecutionContext(pQInfo, pTableQueryInfo, pTableDataInfo->tableIndex, pTableDataInfo->groupIdx, nextKey); + setExecutionContext(pQInfo, pTableQueryInfo, pTable, pTableDataInfo->groupIdx, nextKey); } else { // interval query setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey); - int32_t ret = setAdditionalInfo(pQInfo, pTableDataInfo->tableIndex, pTableQueryInfo); + int32_t ret = setAdditionalInfo(pQInfo, pTable, pTableQueryInfo); if (ret != TSDB_CODE_SUCCESS) { pQInfo->code = ret; @@ -4493,7 +4500,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { setQueryStatus(pQuery, QUERY_COMPLETED); #if 0 -// STableGroupList *pTableIdList = pSupporter->pSidSet; +// STableGroupInfo *pTableIdList = pSupporter->pSidSet; int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode; @@ -4747,35 +4754,32 @@ static void createTableDataInfo(SQInfo* pQInfo) { SQuery* pQuery = pQInfo->runtimeEnv.pQuery; // todo make sure the table are added the reference count to gauranteed that all involved tables are valid - int32_t numOfTables = taosArrayGetSize(pQInfo->pTableList); - - if (pQInfo->pTableDataInfo == NULL) { - pQInfo->pTableDataInfo = (STableDataInfo *)calloc(1, sizeof(STableDataInfo) * numOfTables); - if (pQInfo->pTableDataInfo == NULL) { - dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno)); - pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; - return; - } + size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + int32_t index = 0; + for (int32_t i = 0; i < numOfGroups; ++i) { // load all meter meta info + SArray *group = *(SArray**) taosArrayGet(pQInfo->groupInfo.pGroupList, i); - int32_t groupId = 0; - for (int32_t i = 0; i < numOfTables; ++i) { // load all meter meta info - STableId *id = taosArrayGet(pQInfo->pTableList, i); - STableDataInfo *pInfo = &pQInfo->pTableDataInfo[i]; + size_t s = taosArrayGetSize(group); + for(int32_t j = 0; j < s; ++j) { + SPair* p = (SPair*) taosArrayGet(group, j); + STableDataInfo* pInfo = calloc(1, sizeof(STableDataInfo)); + + setTableDataInfo(pInfo, index, i); + pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, ((STable*)(p->first))->tableId.tid, pQuery->window); - setTableDataInfo(pInfo, i, groupId); - pInfo->pTableQInfo = createTableQueryInfo(&pQInfo->runtimeEnv, id->tid, pQuery->window); + p->sec = pInfo; + index += 1; } } } static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) { - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - size_t numOfTables = taosArrayGetSize(pQInfo->pTableList); +// SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - for (int32_t i = 0; i < numOfTables; ++i) { - STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; - changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo); - } +// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { +// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; +// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo); +// } } static void doSaveContext(SQInfo* pQInfo) { @@ -4809,12 +4813,23 @@ static void doRestoreContext(SQInfo* pQInfo) { static void doCloseAllTimeWindowAfterScan(SQInfo* pQInfo) { SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - size_t numOfTables = taosArrayGetSize(pQInfo->pTableList); if (isIntervalQuery(pQuery)) { - for (int32_t i = 0; i < numOfTables; ++i) { - STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; - closeAllTimeWindow(&pTableQueryInfo->windowResInfo); +// for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { +// STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; +// closeAllTimeWindow(&pTableQueryInfo->windowResInfo); +// } + size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + for(int32_t i = 0; i < numOfGroup; ++i) { + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + + size_t num = taosArrayGetSize(group); + for(int32_t j = 0; j < num; ++j) { + SPair* p = taosArrayGet(group, j); + STableDataInfo* pInfo = p->sec; + + closeAllTimeWindow(&pInfo->pTableQInfo->windowResInfo); + } } } else { // close results for group result closeAllTimeWindow(&pQInfo->runtimeEnv.windowResInfo); @@ -5140,15 +5155,15 @@ static void tableQueryImpl(SQInfo* pQInfo) { // record the total elapsed time pQInfo->elapsedTime += (taosGetTimestampUs() - st); - assert(taosArrayGetSize(pQInfo->pTableList) == 1); + assert(pQInfo->groupInfo.numOfTables == 1); /* check if query is killed or not */ if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); } else { - STableId* pTableId = taosArrayGet(pQInfo->pTableList, 0); - dTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", - pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); +// STableId* pTableId = taosArrayGet(pQInfo->groupInfo, 0); +// dTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", +// pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); } sem_post(&pQInfo->dataReady); @@ -5175,8 +5190,7 @@ static void stableQueryImpl(SQInfo* pQInfo) { // taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType); if (pQuery->rec.rows == 0) { - int32_t numOfTables = taosArrayGetSize(pQInfo->pTableList); - dTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, numOfTables, pQuery->rec.total); + dTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables, pQuery->rec.total); // vnodePrintQueryStatistics(pSupporter); } @@ -5410,8 +5424,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx); pQueryMsg->orderType = htons(pQueryMsg->orderType); - - pMsg += sizeof(SColIndex) * pQueryMsg->numOfGroupCols; } pQueryMsg->interpoType = htons(pQueryMsg->interpoType); @@ -5710,10 +5722,10 @@ static void doUpdateExprColumnIndex(SQuery* pQuery) { } static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, - SArray *pTableList) { + STableGroupInfo *groupInfo) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { - goto _clean_pQInfo_memory; + return NULL; } SQuery *pQuery = calloc(1, sizeof(SQuery)); @@ -5808,7 +5820,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - pQInfo->pTableList = pTableList; + pQInfo->groupInfo = *groupInfo; pQuery->pos = -1; @@ -5842,7 +5854,6 @@ _clean_memory: tfree(pExprs); tfree(pGroupbyExpr); -_clean_pQInfo_memory: tfree(pQInfo); return NULL; @@ -5918,14 +5929,12 @@ static void freeQInfo(SQInfo *pQInfo) { sem_destroy(&(pQInfo->dataReady)); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - if (pQInfo->pTableDataInfo != NULL) { - // size_t num = taosHashGetSize(pQInfo->pTableList); - for (int32_t j = 0; j < 0; ++j) { - destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); - } - } - - tfree(pQInfo->pTableDataInfo); +// if (pQInfo->pTableDataInfo != NULL) { + // size_t num = taosHashGetSize(pQInfo->groupInfo); +// for (int32_t j = 0; j < 0; ++j) { +// destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); +// } +// } for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; @@ -5958,7 +5967,7 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->pGroupbyExpr); tfree(pQuery); - taosArrayDestroy(pQInfo->pTableList); + taosArrayDestroy(pQInfo->groupInfo.pGroupList); dTrace("QInfo:%p QInfo is freed", pQInfo); @@ -6023,7 +6032,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // todo if interpolation exists, the result may be dump to client by several rounds } -int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) { assert(pQueryMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -6061,7 +6070,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) } bool isSTableQuery = false; - SArray* pGroupList = NULL; + STableGroupInfo* groupInfo = calloc(1, sizeof(STableGroupInfo)); if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { isSTableQuery = true; @@ -6069,8 +6078,8 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) STableId* id = taosArrayGet(pTableIdList, 0); id->uid = -1; //todo fix me - /*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &pGroupList, pGroupColIndex, pQueryMsg->numOfGroupCols); - if (taosArrayGetSize(pGroupList) == 0) { // no qualified tables no need to do query + /*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); + if (groupInfo->numOfTables == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; goto _query_over; } @@ -6078,12 +6087,12 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) assert(taosArrayGetSize(pTableIdList) == 1); STableId* id = taosArrayGet(pTableIdList, 0); - if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &pGroupList)) != TSDB_CODE_SUCCESS) { + if ((code = tsdbGetOneTableGroup(tsdb, id->uid, groupInfo)) != TSDB_CODE_SUCCESS) { goto _query_over; } } - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pGroupList); + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, groupInfo); if ((*pQInfo) == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -6113,12 +6122,14 @@ _query_over: return TSDB_CODE_SUCCESS; } -void qDestroyQueryInfo(SQInfo* pQInfo) { +void qDestroyQueryInfo(qinfo_t pQInfo) { dTrace("QInfo:%p query completed", pQInfo); freeQInfo(pQInfo); } -void qTableQuery(SQInfo *pQInfo) { +void qTableQuery(qinfo_t qinfo) { + SQInfo* pQInfo = (SQInfo*) qinfo; + if (pQInfo == NULL || pQInfo->signature != pQInfo) { dTrace("%p freed abort query", pQInfo); return; @@ -6140,7 +6151,9 @@ void qTableQuery(SQInfo *pQInfo) { // vnodeDecRefCount(pQInfo); } -int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) { +int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { + SQInfo* pQInfo = (SQInfo*) qinfo; + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } @@ -6158,7 +6171,9 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) { return pQInfo->code; } -bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) { +bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { + SQInfo* pQInfo = (SQInfo*) qinfo; + if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { return false; } @@ -6175,7 +6190,9 @@ bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) { } } -int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen) { +int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen) { + SQInfo* pQInfo = (SQInfo*) qinfo; + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 39a993a10650370197d9e3b1e8cbb0d5f6b75655..5d443d2290d70f01bf10e1081a9c1b547ade40dd 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -214,6 +214,23 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { } } +int32_t tsdbGetTableTagVal(tsdb_repo_t* repo, STableId id, int32_t col, int16_t* type, int16_t* bytes, char** val) { + STsdbMeta* pMeta = tsdbGetMeta(repo); + STable* pTable = tsdbGetTableByUid(pMeta, id.uid); + + STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable); + STColumn* pCol = schemaColAt(pSchema, col); + + SDataRow row = (SDataRow)pTable->tagVal; + char* d = dataRowAt(row, TD_DATA_ROW_HEAD_SIZE); + + *val = d; + *type = pCol->type; + *bytes = pCol->bytes; + + return 0; +} + int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { if (tsdbCheckTableCfg(pCfg) < 0) return -1; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c68701dcaaa53dd6a5e16db31fc7059d0499ad3b..6b3b7e1e4e3d62b5e6807cc012c3c0c73ea14fec 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -18,11 +18,10 @@ #include "talgo.h" #include "tlog.h" #include "tutil.h" +#include "tcompare.h" -#include "../../../query/inc/qast.h" -#include "../../../query/inc/qextbuffer.h" -#include "../../../query/inc/tlosertree.h" -#include "../../../query/inc/tsqlfunction.h" +#include "../../../query/inc/qast.h" // todo move to common module +#include "../../../query/inc/tlosertree.h" // todo move to util module #include "tsdb.h" #include "tsdbMain.h" @@ -143,7 +142,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { pCompBlockLoadInfo->fileListIndex = -1; } -tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, SArray* groupList, SArray* pColumnInfo) { +tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, SArray* pColumnInfo) { // todo 1. filter not exist table // todo 2. add the reference count for each table that is involved in query @@ -157,22 +156,25 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S pQueryHandle->isFirstSlot = true; pQueryHandle->cur.fid = -1; - size_t size = taosArrayGetSize(groupList); - assert(size >= 1); + size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); + assert(sizeOfGroup >= 1); - pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo)); - for (int32_t i = 0; i < size; ++i) { - SArray* group = *(SArray**)taosArrayGet(groupList, i); + pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); + + for (int32_t i = 0; i < sizeOfGroup; ++i) { + SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i); size_t gsize = taosArrayGetSize(group); + assert(gsize > 0); + for (int32_t j = 0; j < gsize; ++j) { - STable* pTable = *(STable**)taosArrayGet(group, j); - assert(pTable != NULL); + SPair* d = (SPair*) taosArrayGet(group, j); + assert(d->first != NULL); STableCheckInfo info = { .lastKey = pQueryHandle->window.skey, - .tableId = pTable->tableId, - .pTableObj = pTable, + .tableId = ((STable*) d->first)->tableId, + .pTableObj = d->first, }; taosArrayPush(pQueryHandle->pTableCheckInfo, &info); @@ -1143,7 +1145,7 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) { SSkipListNode* pNode = tSkipListIterGet(iter); STable* t = *(STable**)SL_GET_NODE_DATA(pNode); - taosArrayPush(list, t); + taosArrayPush(list, &t); } return TSDB_CODE_SUCCESS; @@ -1167,7 +1169,7 @@ static void convertQueryResult(SArray* pRes, SArray* pTableList) { size_t size = taosArrayGetSize(pTableList); for (int32_t i = 0; i < size; ++i) { STable* pTable = taosArrayGetP(pTableList, i); - taosArrayPush(pRes, &pTable->tableId); + taosArrayPush(pRes, &pTable); } } @@ -1181,24 +1183,20 @@ static void destroyHelper(void* param) { free(param); } -static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index, int32_t* offset) { +static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index) { *index = 0; - *offset = 0; // filter on table name(TBNAME) if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) { *index = TSDB_TBNAME_COLUMN_INDEX; - *offset = TSDB_TBNAME_COLUMN_INDEX; return; } while ((*index) < pSupporter->numOfTags) { if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes && - pSupporter->pTagSchema[*index].type == pSchema->type && + pSupporter->pTagSchema[*index].type == pSchema->type && pSupporter->pTagSchema[*index].colId == pSchema->colId) { break; - } else { - (*offset) += pSupporter->pTagSchema[(*index)++].bytes; } } } @@ -1219,15 +1217,14 @@ void filterPrepare(void* expr, void* param) { tVariant* pCond = pExpr->_node.pRight->pVal; SSchema* pSchema = pExpr->_node.pLeft->pSchema; - getTagColumnInfo(pSupporter, pSchema, &i, &offset); + getTagColumnInfo(pSupporter, pSchema, &i); assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX)); assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX)); - pInfo->sch = *pSchema; + pInfo->sch = *pSchema; pInfo->colIndex = i; - pInfo->optr = pExpr->_node.optr; - pInfo->offset = offset; - // pInfo->compare = getFilterComparator(pSchema->type, pCond->nType, pInfo->optr); + pInfo->optr = pExpr->_node.optr; + pInfo->compare = getComparFunc(pSchema->type, pCond->nType, pInfo->optr); tVariantAssign(&pInfo->q, pCond); tVariantTypeSetType(&pInfo->q, pInfo->sch.type); @@ -1306,26 +1303,33 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { } void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) { - SArray* g = taosArrayInit(16, POINTER_BYTES); - taosArrayPush(g, &pTables[0]); + SArray* g = taosArrayInit(16, sizeof(SPair)); + + SPair p = {.first = pTables[0]}; + taosArrayPush(g, &p); for (int32_t i = 1; i < numOfTables; ++i) { int32_t ret = compareFn(&pTables[i - 1], &pTables[i], pSupp); assert(ret == 0 || ret == -1); if (ret == 0) { - taosArrayPush(g, &pTables[i]); + SPair p1 = {.first = pTables[i]}; + taosArrayPush(g, &p1); } else { taosArrayPush(pGroups, &g); // current group is ended, start a new group - g = taosArrayInit(16, POINTER_BYTES); - taosArrayPush(g, &pTables[i]); + + SPair p1 = {.first = pTables[i]}; + taosArrayPush(g, &p1); } } + + taosArrayPush(pGroups, &g); } SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) { - assert(pTableList != NULL && taosArrayGetSize(pTableList) > 0); + assert(pTableList != NULL); + SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); size_t size = taosArrayGetSize(pTableList); @@ -1335,7 +1339,17 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC } if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table - taosArrayPush(pTableGroup, pTableList); + size_t num = taosArrayGetSize(pTableList); + + SArray* sa = taosArrayInit(num, sizeof(SPair)); + for(int32_t i = 0; i < num; ++i) { + STable* pTable = taosArrayGetP(pTableList, i); + + SPair p = {.first = pTable}; + taosArrayPush(sa, &p); + } + + taosArrayPush(pTableGroup, &sa); pTrace("all %d tables belong to one group", size); #ifdef _DEBUG_VIEW @@ -1362,7 +1376,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC bool tSkipListNodeFilterCallback(const void* pNode, void* param) { tQueryInfo* pInfo = (tQueryInfo*)param; - STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); + STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); char* val = dataRowTuple(pTable->tagVal); // todo not only the first column int8_t type = pInfo->sch.type; @@ -1419,7 +1433,8 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) SExprTreeSupporter s = {.pTagSchema = schema, .numOfTags = schemaNCols(pSTable->tagSchema)}; SBinaryFilterSupp supp = { - .fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = filterPrepare, .pExtInfo = &s}; + .fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = filterPrepare, .pExtInfo = &s, + }; SArray* pTableList = taosArrayInit(8, POINTER_BYTES); @@ -1430,7 +1445,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) return TSDB_CODE_SUCCESS; } -int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, SArray** pGroupList, +int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo, SColIndex* pColIndex, int32_t numOfCols) { STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); @@ -1448,9 +1463,9 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size taosArrayDestroy(res); return ret; } - - *pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); - taosArrayDestroy(res); + + pGroupInfo->numOfTables = taosArrayGetSize(res); + pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); return ret; } @@ -1465,25 +1480,27 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size } doQueryTableList(pSTable, res, pExprNode); - *pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); + + pGroupInfo->numOfTables = taosArrayGetSize(res); + pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); - taosArrayDestroy(res); return ret; } -int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, SArray** pGroupList) { +int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pGroupInfo) { STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); if (pTable == NULL) { return TSDB_CODE_INVALID_TABLE_ID; } //todo assert table type, add the table ref count + pGroupInfo->numOfTables = 1; + pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); - *pGroupList = taosArrayInit(1, POINTER_BYTES); SArray* group = taosArrayInit(1, POINTER_BYTES); taosArrayPush(group, &pTable); - taosArrayPush(*pGroupList, &group); + taosArrayPush(pGroupInfo->pGroupList, &group); return TSDB_CODE_SUCCESS; } diff --git a/src/util/inc/tcompare.h b/src/util/inc/tcompare.h index b8778b80c884804291f562673ca4445292ad68aa..3484b5b8baf7819811e19346ff7c8d1a211d7a41 100644 --- a/src/util/inc/tcompare.h +++ b/src/util/inc/tcompare.h @@ -34,31 +34,13 @@ typedef struct SPatternCompareInfo { char matchOne; // symbol for match one wildcard, default: '_' } SPatternCompareInfo; -int32_t compareInt32Val(const void *pLeft, const void *pRight); - -int32_t compareInt64Val(const void *pLeft, const void *pRight); - -int32_t compareInt16Val(const void *pLeft, const void *pRight); - -int32_t compareInt8Val(const void *pLeft, const void *pRight); - -int32_t compareIntDoubleVal(const void *pLeft, const void *pRight); - -int32_t compareDoubleIntVal(const void *pLeft, const void *pRight); - -int32_t compareDoubleVal(const void *pLeft, const void *pRight); - -int32_t compareStrVal(const void *pLeft, const void *pRight); - -int32_t compareWStrVal(const void *pLeft, const void *pRight); - int patternMatch(const char *zPattern, const char *zString, size_t size, const SPatternCompareInfo *pInfo); int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo); __compar_fn_t getKeyComparFunc(int32_t keyType); -__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType); +__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr); #ifdef __cplusplus } diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index 3420e5b536ad8ea373ce77ece9299c44ef9195ef..ed58c2e60d86b9bb11d71c132295718657b4add4 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -107,6 +107,11 @@ extern "C" { #define POW2(x) ((x) * (x)) +typedef struct SPair { + void* first; + void* sec; +} SPair; + int32_t strdequote(char *src); void strtrim(char *src); diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index 848eeee5738ca07e0a4d73c0c9ea1bc920e480d8..4505ea533a2725869e159c061b9dbb0baa78e050 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -40,25 +40,23 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) { } int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) { - // int64_t lhs = ((SSkipListKey *)pLeft)->i64Key; - // double rhs = ((SSkipListKey *)pRight)->dKey; - // if (fabs(lhs - rhs) < FLT_EPSILON) { - // return 0; - // } else { - // return (lhs > rhs) ? 1 : -1; - // } - return 0; + int64_t lhs = GET_INT64_VAL(pLeft); + double rhs = GET_DOUBLE_VAL(pRight); + if (fabs(lhs - rhs) < FLT_EPSILON) { + return 0; + } else { + return (lhs > rhs) ? 1 : -1; + } } int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) { - // double lhs = ((SSkipListKey *)pLeft)->dKey; - // int64_t rhs = ((SSkipListKey *)pRight)->i64Key; - // if (fabs(lhs - rhs) < FLT_EPSILON) { - // return 0; - // } else { - // return (lhs > rhs) ? 1 : -1; - // } - return 0; + double lhs = GET_DOUBLE_VAL(pLeft); + int64_t rhs = GET_INT64_VAL(pRight); + if (fabs(lhs - rhs) < FLT_EPSILON) { + return 0; + } else { + return (lhs > rhs) ? 1 : -1; + } } int32_t compareDoubleVal(const void *pLeft, const void *pRight) { @@ -241,7 +239,8 @@ static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void* return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; } -__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) { +// todo promote the type definition before the comparsion +__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr) { __compar_fn_t comparFn = NULL; switch (type) { @@ -250,10 +249,15 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) { case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_TIMESTAMP: { +// assert(type == filterDataType); + if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) { comparFn = compareInt64Val; - break; + } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { + comparFn = compareIntDoubleVal; } + + break; } case TSDB_DATA_TYPE_BOOL: { if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { @@ -265,22 +269,37 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) { } case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_DOUBLE: { -// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { -// comparFn = compareDoubleIntVal; -// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { -// comparFn = compareDoubleVal; -// } - if (filterDataType == TSDB_DATA_TYPE_DOUBLE) { + if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { + comparFn = compareDoubleIntVal; + } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { comparFn = compareDoubleVal; } break; } - case TSDB_DATA_TYPE_BINARY: - comparFn = compareStrVal; + case TSDB_DATA_TYPE_BINARY: { + assert(filterDataType == TSDB_DATA_TYPE_BINARY); + + if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */ + comparFn = compareStrPatternComp; + } else { /* normal relational comparFn */ + comparFn = compareStrVal; + } + break; - case TSDB_DATA_TYPE_NCHAR: - comparFn = compareWStrVal; + } + + case TSDB_DATA_TYPE_NCHAR: { + assert(filterDataType == TSDB_DATA_TYPE_NCHAR); + + if (optr == TSDB_RELATION_LIKE) { + comparFn = compareWStrPatternComp; + } else { + comparFn = compareWStrVal; + } + break; + } + default: comparFn = compareInt32Val; break; diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 8ce4530dbf997b6673d4e8cdd903428ea0a07ddc..d91230164115408c029aee93e0a708c533aca8b8 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -110,12 +110,7 @@ short tsDaysPerFile = 10; int tsDaysToKeep = 3650; int tsReplications = TSDB_REPLICA_MIN_NUM; -#ifdef _MPEER int tsNumOfMPeers = 3; -#else -int tsNumOfMPeers = 1; -#endif - int tsMaxShellConns = 2000; int tsMaxTables = 100000; @@ -556,7 +551,7 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "tblocks", &tsNumOfBlocksPerMeter, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 32, 4096, 0, TSDB_CFG_UTYPE_NONE); -#ifdef _MPEER +#ifdef _SYNC tsInitConfigOption(cfg++, "numOfMPeers", &tsNumOfMPeers, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER, 1, 3, 0, TSDB_CFG_UTYPE_NONE); diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 867309c1630d6a1a3cef056725c0d7e3aaf60a43..2add3ad8498255c48d5b5dde3ea44bd40738ff64 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -308,7 +308,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) { pSkipList->state.queryCount++; #endif - __compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, keyType); + __compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, keyType, 0); int32_t ret = -1; for (int32_t i = sLevel; i >= 0; --i) { SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); @@ -372,7 +372,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; int32_t ret = -1; - __compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, type); + __compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, type, 0); SSkipListNode* pNode = pSkipList->pHead; for (int32_t i = pSkipList->level - 1; i >= 0; --i) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 59f392eb82f03b34a612cd5d3ce2f823e7fde958..7fcd02a102e6a5099b46ff9828c74e4edb2b4d83 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -25,7 +25,7 @@ #include "dataformat.h" #include "vnode.h" #include "vnodeInt.h" -#include "queryExecutor.h" +#include "query.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); @@ -54,7 +54,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont int32_t code = TSDB_CODE_SUCCESS; - SQInfo* pQInfo = NULL; + qinfo_t pQInfo = NULL; if (contLen != 0) { void* tsdb = vnodeGetTsdb(pVnode); pRet->code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index edca4e371cdbc09cfe98f26e7b7972348c1e8098..7ec55bbf1f1e44cf319d60e75f5da7753ef762d3 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -35,6 +35,7 @@ #define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);} typedef struct { + uint64_t version; int fd; int keep; int level; @@ -50,7 +51,7 @@ int wDebugFlag = 135; static uint32_t walSignature = 0xFAFBFDFE; static int walHandleExistingFiles(const char *path); -static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp); +static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); static int walRemoveWalFiles(const char *path); void *walOpen(const char *path, const SWalCfg *pCfg) { @@ -68,6 +69,8 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { if (access(path, F_OK) != 0) mkdir(path, 0755); + if (pCfg->keep == 1) return pWal; + if (walHandleExistingFiles(path) == 0) walRenew(pWal); @@ -154,6 +157,7 @@ int walWrite(void *handle, SWalHead *pHead) { // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; + if (pHead->version <= pWal->version) return 0; pHead->signature = walSignature; taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); @@ -162,7 +166,9 @@ int walWrite(void *handle, SWalHead *pHead) { if(write(pWal->fd, pHead, contLen) != contLen) { wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); code = -1; - } + } else { + pWal->version = pHead->version; + } return code; } @@ -184,7 +190,10 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) int plen = strlen(walPrefix); char opath[TSDB_FILENAME_LEN+5]; - sprintf(opath, "%s/old", pWal->path); + + int slen = sprintf(opath, "%s", pWal->path); + if ( pWal->keep == 0) + strcpy(opath+slen, "/old"); // is there old directory? if (access(opath, F_OK)) return 0; @@ -199,6 +208,8 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) } } + if (count == 0) return 0; + if ( count != (maxId-minId+1) ) { wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId); code = -1; @@ -206,17 +217,29 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) wTrace("wal:%s, %d files will be restored", opath, count); for (index = minId; index<=maxId; ++index) { - sprintf(pWal->name, "%s/old/%s%d", pWal->path, walPrefix, index); - code = walRestoreWalFile(pWal->name, pVnode, writeFp); + sprintf(pWal->name, "%s/%s%d", opath, walPrefix, index); + code = walRestoreWalFile(pWal, pVnode, writeFp); if (code < 0) break; } } if (code == 0) { - code = walRemoveWalFiles(opath); - if (code == 0) { - if (remove(opath) < 0) { - wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); + if (pWal->keep == 0) { + code = walRemoveWalFiles(opath); + if (code == 0) { + if (remove(opath) < 0) { + wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); + code = -1; + } + } + } else { + // open the existing WAL file in append mode + pWal->num = count; + pWal->id = maxId; + sprintf(pWal->name, "%s/%s%d", opath, walPrefix, maxId); + pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); + if (pWal->fd < 0) { + wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno)); code = -1; } } @@ -252,8 +275,9 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { return code; } -static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp) { - int code = 0; +static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { + int code = 0; + char *name = pWal->name; char *buffer = malloc(1024000); // size for one record if (buffer == NULL) return -1; @@ -289,10 +313,11 @@ static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp) break; } - // write into queue + if (pWal->keep) pWal->version = pHead->version; (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); } + close(fd); free(buffer); return code; @@ -365,4 +390,3 @@ static int walRemoveWalFiles(const char *path) { return code; } - diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c index 8e10bc11e54ba5e8e39c8116d5a1f5c86206a362..53f3477c7bf786f1399e503c24981b4c32a34618 100644 --- a/src/wal/test/waltest.c +++ b/src/wal/test/waltest.c @@ -29,8 +29,6 @@ int writeToQueue(void *pVnode, void *data, int type) { ver = pHead->version; walWrite(pWal, pHead); - - free(pHead); return 0; } @@ -42,6 +40,7 @@ int main(int argc, char *argv[]) { int total = 5; int rows = 10000; int size = 128; + int keep = 0; for (int i=1; i