提交 a1a706e6 编写于 作者: H hzcheng

Merge branch 'develop' into feature/2.0tsdb

...@@ -71,6 +71,7 @@ typedef struct STableMeta { ...@@ -71,6 +71,7 @@ typedef struct STableMeta {
typedef struct STableMetaInfo { typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
SVgroupsInfo* vgroupList; SVgroupsInfo* vgroupList;
/* /*
* 1. keep the vnode index during the multi-vnode super table projection query * 1. keep the vnode index during the multi-vnode super table projection query
* 2. keep the vnode index for multi-vnode insertion * 2. keep the vnode index for multi-vnode insertion
......
...@@ -47,7 +47,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -47,7 +47,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = param; pSql->param = param;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_REPLICA_MAX_NUM; pSql->maxRetry = 1;
pSql->fp = fp; pSql->fp = fp;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
......
...@@ -651,19 +651,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -651,19 +651,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query super table } else { // query super table
int32_t index = pTableMetaInfo->vgroupIndex;
if (pTableMetaInfo->vgroupIndex < 0) { if (index < 0) {
tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vgroupIndex); tscError("%p error vgroupIndex:%d", pSql, index);
return -1; return -1;
} }
pSql->ipList.numOfIps = 1; // todo fix me SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
pSql->ipList.numOfIps = pVgroupInfo->numOfIps; // todo fix me
pSql->ipList.port = tsDnodeShellPort; pSql->ipList.port = tsDnodeShellPort;
pSql->ipList.inUse = 0; pSql->ipList.inUse = 0;
// todo extract method for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[pTableMetaInfo->vgroupIndex]; pSql->ipList.ip[i] = pVgroupInfo->ipAddr[i].ip;
pSql->ipList.ip[0] = pVgroupInfo->ipAddr[0].ip; }
#if 0 #if 0
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
...@@ -676,8 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -676,8 +679,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
#endif #endif
tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, index);
pTableMetaInfo->vgroupIndex);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
numOfTables = 1; numOfTables = 1;
...@@ -823,7 +825,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -823,7 +825,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg); pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg);
SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr;
if (pGroupbyExpr->numOfGroupCols != 0) { if (pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->orderType = htons(pGroupbyExpr->orderType); pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
...@@ -2133,10 +2135,11 @@ _error_clean: ...@@ -2133,10 +2135,11 @@ _error_clean:
for(int32_t i = 0; i < pInfo->vgroupList->numOfVgroups; ++i) { for(int32_t i = 0; i < pInfo->vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pVgroups = &pInfo->vgroupList->vgroups[i]; SCMVgroupInfo* pVgroups = &pInfo->vgroupList->vgroups[i];
pVgroups->numOfIps = htonl(pVgroups->numOfIps);
pVgroups->vgId = htonl(pVgroups->vgId); pVgroups->vgId = htonl(pVgroups->vgId);
assert(pVgroups->numOfIps >= 1);
for(int32_t j = 0; j < tListLen(pVgroups->ipAddr); ++j) { for(int32_t j = 0; j < pVgroups->numOfIps; ++j) {
pVgroups->ipAddr[j].ip = htonl(pVgroups->ipAddr[j].ip); pVgroups->ipAddr[j].ip = htonl(pVgroups->ipAddr[j].ip);
pVgroups->ipAddr[j].port = htons(pVgroups->ipAddr[j].port); pVgroups->ipAddr[j].port = htons(pVgroups->ipAddr[j].port);
} }
......
...@@ -690,7 +690,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -690,7 +690,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// current data are exhausted, fetch more data // current data are exhausted, fetch more data
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC ||
pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_DESCRIBE_TABLE))) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
sem_wait(&pSql->rspSem); sem_wait(&pSql->rspSem);
...@@ -773,8 +774,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { ...@@ -773,8 +774,9 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (pRes == NULL || pRes->qhandle == 0) { if (pRes == NULL || pRes->qhandle == 0) {
/* Query rsp is not received from vnode, so the qhandle is NULL */ /* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
STscObj* pTscObj = pSql->pTscObj;
if (tscShouldFreeAsyncSqlObj(pSql)) { if (pTscObj->pSql != pSql) {
tscTrace("%p SqlObj is freed by app", pSql); tscTrace("%p SqlObj is freed by app", pSql);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
} else { } else {
......
...@@ -1207,7 +1207,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -1207,7 +1207,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
// all subqueries are failed // all subqueries are failed
tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code); tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, pState->code);
pPObj->res.code = -(pState->code); pPObj->res.code = pState->code;
// release allocated resource // release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
...@@ -1336,12 +1336,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -1336,12 +1336,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SSqlRes * pRes = &pSql->res; SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx);
// SVnodeSidList *vnodeInfo = 0;
// SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
if (numOfRows > 0) { if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows); assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
...@@ -1384,11 +1378,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -1384,11 +1378,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
pthread_mutex_unlock(&trsupport->queryMutex); pthread_mutex_unlock(&trsupport->queryMutex);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
} }
pthread_mutex_unlock(&trsupport->queryMutex);
} else { // all data has been retrieved to client } else { // all data has been retrieved to client
tscAllDataRetrievedFromDnode(trsupport, pSql); tscAllDataRetrievedFromDnode(trsupport, pSql);
} }
pthread_mutex_unlock(&trsupport->queryMutex);
} }
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
...@@ -1479,10 +1473,15 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1479,10 +1473,15 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
tscHandleSubqueryError(param, tres, pState->code); tscHandleSubqueryError(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode } else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql,
pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex); pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
tscRetrieveFromDnodeCallBack(param, pSql, 0);
} else {
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
}
} }
} }
......
...@@ -2128,27 +2128,23 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2128,27 +2128,23 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
if (pPrevSql == NULL) { if (pPrevSql == NULL) {
STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name); STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name);
// SSuperTableMeta* pMetricMeta = NULL;
// if (cmd == TSDB_SQL_SELECT) {
// pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
// }
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->numOfTags, pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, pTableMetaInfo->numOfTags,
pTableMetaInfo->tagColumnIndex); pTableMetaInfo->tagColumnIndex);
} else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object. } else { // transfer the ownership of pTableMeta to the newly create sql object.
// STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
// STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
// pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags, STableMeta* pPrevTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
// pTableMetaInfo->tagColumnIndex); SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
pPrevInfo->vgroupList = NULL;
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->numOfTags,
pTableMetaInfo->tagColumnIndex);
} }
assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1); assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1);
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
// assert(pFinalInfo->pMetricMeta != NULL); assert(pFinalInfo->vgroupList != NULL);
} }
if (cmd == TSDB_SQL_SELECT) { if (cmd == TSDB_SQL_SELECT) {
......
...@@ -23,8 +23,6 @@ extern "C" { ...@@ -23,8 +23,6 @@ extern "C" {
int32_t dnodeInitMClient(); int32_t dnodeInitMClient();
void dnodeCleanupMClient(); void dnodeCleanupMClient();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
void * dnodeGetMnodeList();
int32_t dnodeGetDnodeId();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,11 +22,11 @@ ...@@ -22,11 +22,11 @@
#include "trpc.h" #include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
#include "twal.h" #include "twal.h"
#include "vnode.h"
#include "dnodeMClient.h" #include "dnodeMClient.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "dnodeRead.h" #include "dnodeRead.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "vnode.h"
static int32_t dnodeOpenVnodes(); static int32_t dnodeOpenVnodes();
static void dnodeCloseVnodes(); static void dnodeCloseVnodes();
......
...@@ -118,7 +118,7 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { ...@@ -118,7 +118,7 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
dPrint("module status is received, start mgmt module", tsModuleStatus, moduleStatus); dPrint("module status is received, start mgmt module", tsModuleStatus, moduleStatus);
tsModule[TSDB_MOD_MGMT].enable = true; tsModule[TSDB_MOD_MGMT].enable = true;
dnodeSetModuleStatus(TSDB_MOD_MGMT); dnodeSetModuleStatus(TSDB_MOD_MGMT);
(*tsModule[TSDB_MOD_MGMT].stopFp)(); (*tsModule[TSDB_MOD_MGMT].startFp)();
} }
if (tsModule[TSDB_MOD_MGMT].enable && !enableMgmtModule) { if (tsModule[TSDB_MOD_MGMT].enable && !enableMgmtModule) {
......
...@@ -43,6 +43,8 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code); ...@@ -43,6 +43,8 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code);
bool dnodeIsFirstDeploy(); bool dnodeIsFirstDeploy();
uint32_t dnodeGetMnodeMasteIp(); uint32_t dnodeGetMnodeMasteIp();
void * dnodeGetMnodeList();
int32_t dnodeGetDnodeId();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERY_H
#define TDENGINE_QUERY_H
#ifdef __cplusplus
extern "C" {
#endif
typedef void* qinfo_t;
/**
* create the qinfo object according to QueryTableMsg
* @param pVnode
* @param pQueryTableMsg
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
/**
* Destroy QInfo object
*
* @param qinfo
* @return
*/
void qDestroyQueryInfo(qinfo_t qinfo);
/**
* the main query execution function, including query on both table and multitables,
* which are decided according to the tag or table name query conditions
*
* @param qinfo
* @return
*/
void qTableQuery(qinfo_t qinfo);
/**
* Retrieve the produced results information, if current query is not paused or completed,
* this function will be blocked to wait for the query execution completed or paused,
* in which case enough results have been produced already.
*
* @param qinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo);
/**
*
* Retrieve the actual results to fill the response message payload.
* Note that this function must be executed after qRetrieveQueryResultInfo is invoked.
*
* @param qinfo qinfo object
* @param pRsp response message
* @param contLen payload length
* @return
*/
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
/**
* Decide if more results will be produced or not
*
* @param qinfo
* @return
*/
bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QUERY_H
...@@ -584,6 +584,7 @@ typedef struct { ...@@ -584,6 +584,7 @@ typedef struct {
char dnodeName[TSDB_NODE_NAME_LEN + 1]; char dnodeName[TSDB_NODE_NAME_LEN + 1];
uint32_t privateIp; uint32_t privateIp;
uint32_t publicIp; uint32_t publicIp;
uint32_t moduleStatus;
uint32_t lastReboot; // time stamp for last reboot uint32_t lastReboot; // time stamp for last reboot
uint16_t numOfTotalVnodes; // from config file uint16_t numOfTotalVnodes; // from config file
uint16_t openVnodes; uint16_t openVnodes;
......
...@@ -93,6 +93,8 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup); ...@@ -93,6 +93,8 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup); int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup);
void tsdbClearTableCfg(STableCfg *config); void tsdbClearTableCfg(STableCfg *config);
int32_t tsdbGetTableTagVal(tsdb_repo_t *repo, STableId id, int32_t col, int16_t* type, int16_t* bytes, char** val);
int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg); int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg);
int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId); int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg); int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
...@@ -159,11 +161,16 @@ typedef struct SDataBlockInfo { ...@@ -159,11 +161,16 @@ typedef struct SDataBlockInfo {
int32_t sid; int32_t sid;
} SDataBlockInfo; } SDataBlockInfo;
typedef struct {
size_t numOfTables;
SArray* pGroupList;
} STableGroupInfo;
typedef struct { typedef struct {
} SFields; } SFields;
#define TSDB_TS_GREATER_EQUAL 1 #define TSDB_TS_GREATER_EQUAL 1
#define TSDB_TS_LESS_EQUAL 2 #define TSDB_TS_LESS_EQUAL 2
typedef struct SQueryRowCond { typedef struct SQueryRowCond {
int32_t rel; int32_t rel;
...@@ -178,7 +185,7 @@ typedef void *tsdbpos_t; ...@@ -178,7 +185,7 @@ typedef void *tsdbpos_t;
* @param pTableList table sid list * @param pTableList table sid list
* @return * @return
*/ */
tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t *tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo); tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, SArray *pColumnInfo);
/** /**
* move to next block * move to next block
...@@ -280,10 +287,10 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); ...@@ -280,10 +287,10 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
* @param pTagCond. tag query condition * @param pTagCond. tag query condition
* *
*/ */
int32_t tsdbQueryTags(tsdb_repo_t *tsdb, int64_t uid, const char *pTagCond, size_t len, SArray **pGroupList, int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupList,
SColIndex *pColIndex, int32_t numOfCols); SColIndex* pColIndex, int32_t numOfCols);
int32_t tsdbGetOneTableGroup(tsdb_repo_t *tsdb, int64_t uid, SArray **pGroupList); int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pGroupInfo);
/** /**
* clean up the query handle * clean up the query handle
......
...@@ -31,7 +31,6 @@ int32_t mgmtInitDnodes(); ...@@ -31,7 +31,6 @@ int32_t mgmtInitDnodes();
void mgmtCleanupDnodes(); void mgmtCleanupDnodes();
char* mgmtGetDnodeStatusStr(int32_t dnodeStatus); char* mgmtGetDnodeStatusStr(int32_t dnodeStatus);
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType);
void mgmtMonitorDnodeModule(); void mgmtMonitorDnodeModule();
int32_t mgmtGetDnodesNum(); int32_t mgmtGetDnodesNum();
......
...@@ -129,7 +129,7 @@ static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) { ...@@ -129,7 +129,7 @@ static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) {
static int32_t mgmtDnodeActionRestored() { static int32_t mgmtDnodeActionRestored() {
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
if (numOfRows <= 0 && strcmp(tsMasterIp, tsPrivateIp) == 0) { if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
uint32_t ip = inet_addr(tsPrivateIp); uint32_t ip = inet_addr(tsPrivateIp);
mgmtCreateDnode(ip); mgmtCreateDnode(ip);
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip); SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
...@@ -276,6 +276,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -276,6 +276,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->privateIp = htonl(pStatus->privateIp); pStatus->privateIp = htonl(pStatus->privateIp);
pStatus->publicIp = htonl(pStatus->publicIp); pStatus->publicIp = htonl(pStatus->publicIp);
pStatus->moduleStatus = htonl(pStatus->moduleStatus);
pStatus->lastReboot = htonl(pStatus->lastReboot); pStatus->lastReboot = htonl(pStatus->lastReboot);
pStatus->numOfCores = htons(pStatus->numOfCores); pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
...@@ -311,6 +312,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -311,6 +312,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pDnode->diskAvailable = pStatus->diskAvailable; pDnode->diskAvailable = pStatus->diskAvailable;
pDnode->alternativeRole = pStatus->alternativeRole; pDnode->alternativeRole = pStatus->alternativeRole;
pDnode->totalVnodes = pStatus->numOfTotalVnodes; pDnode->totalVnodes = pStatus->numOfTotalVnodes;
pDnode->moduleStatus = pStatus->moduleStatus;
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
...@@ -353,7 +355,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -353,7 +355,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mgmtGetMnodeList(&pRsp->mnodes); mgmtGetMnodeList(&pRsp->mnodes);
pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId);
pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); pRsp->dnodeState.moduleStatus = htonl((int32_t)pDnode->isMgmt);
pRsp->dnodeState.createdTime = htonl(pDnode->createdTime / 1000); pRsp->dnodeState.createdTime = htonl(pDnode->createdTime / 1000);
pRsp->dnodeState.numOfVnodes = 0; pRsp->dnodeState.numOfVnodes = 0;
...@@ -391,10 +393,6 @@ static int32_t mgmtCreateDnode(uint32_t ip) { ...@@ -391,10 +393,6 @@ static int32_t mgmtCreateDnode(uint32_t ip) {
pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM; pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM;
sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1); sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1);
if (pDnode->privateIp == inet_addr(tsMasterIp)) {
pDnode->moduleStatus |= (1 << TSDB_MOD_MGMT);
}
SSdbOperDesc oper = { SSdbOperDesc oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDnodeSdb, .table = tsDnodeSdb,
...@@ -620,7 +618,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -620,7 +618,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
return numOfRows; return numOfRows;
} }
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { static bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
uint32_t status = pDnode->moduleStatus & (1 << moduleType); uint32_t status = pDnode->moduleStatus & (1 << moduleType);
return status > 0; return status > 0;
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "treplica.h" #include "treplica.h"
#include "tgrant.h" #include "tgrant.h"
#include "ttimer.h" #include "ttimer.h"
#include "dnode.h"
#include "mgmtDef.h" #include "mgmtDef.h"
#include "mgmtLog.h" #include "mgmtLog.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
...@@ -100,6 +101,10 @@ int32_t mgmtStartSystem() { ...@@ -100,6 +101,10 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (replicaInit() < 0) {
mError("failed to init replica")
}
if (mgmtInitDClient() < 0) { if (mgmtInitDClient() < 0) {
return -1; return -1;
} }
...@@ -108,10 +113,6 @@ int32_t mgmtStartSystem() { ...@@ -108,10 +113,6 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (replicaInit() < 0) {
mError("failed to init dnode balance")
}
grantReset(TSDB_GRANT_ALL, 0); grantReset(TSDB_GRANT_ALL, 0);
tsMgmtIsRunning = true; tsMgmtIsRunning = true;
......
...@@ -55,6 +55,12 @@ static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) { ...@@ -55,6 +55,12 @@ static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) {
static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) { static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) {
SMnodeObj *pMnode = pOper->pObj; SMnodeObj *pMnode = pOper->pObj;
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
pDnode->isMgmt = false;
mgmtReleaseDnode(pDnode);
mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId); mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -69,10 +69,16 @@ static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, s ...@@ -69,10 +69,16 @@ static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, s
static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash}; static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash};
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData}; static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData};
uint64_t sdbGetVersion() { return tsSdbObj->version; }
int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; } int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; }
int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; }
uint64_t sdbGetVersion() {
if (tsSdbObj)
return tsSdbObj->version;
else
return 0;
}
static char *sdbGetActionStr(int32_t action) { static char *sdbGetActionStr(int32_t action) {
switch (action) { switch (action) {
case SDB_ACTION_INSERT: case SDB_ACTION_INSERT:
...@@ -147,10 +153,6 @@ void sdbCleanUp() { ...@@ -147,10 +153,6 @@ void sdbCleanUp() {
} }
} }
SSdbObject *sdbGetObj() {
return tsSdbObj;
}
void sdbIncRef(void *handle, void *pRow) { void sdbIncRef(void *handle, void *pRow) {
if (pRow) { if (pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "trpc.h" #include "trpc.h"
#include "ttime.h" #include "ttime.h"
#include "tutil.h" #include "tutil.h"
#include "dnode.h"
#include "mgmtDef.h" #include "mgmtDef.h"
#include "mgmtLog.h" #include "mgmtLog.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
...@@ -93,7 +94,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { ...@@ -93,7 +94,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) {
} }
static int32_t mgmtUserActionRestored() { static int32_t mgmtUserActionRestored() {
if (strcmp(tsMasterIp, tsPrivateIp) == 0) { if (dnodeIsFirstDeploy()) {
SAcctObj *pAcct = mgmtGetAcct("root"); SAcctObj *pAcct = mgmtGetAcct("root");
mgmtCreateUser(pAcct, "root", "taosdata"); mgmtCreateUser(pAcct, "root", "taosdata");
mgmtCreateUser(pAcct, "monitor", tsInternalPass); mgmtCreateUser(pAcct, "monitor", tsInternalPass);
......
...@@ -47,7 +47,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *); ...@@ -47,7 +47,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *);
* *
*/ */
typedef struct tQueryInfo { typedef struct tQueryInfo {
int32_t offset; // offset value in tags
int32_t colIndex; // index of column in schema int32_t colIndex; // index of column in schema
uint8_t optr; // expression operator uint8_t optr; // expression operator
SSchema sch; // schema of tags SSchema sch; // schema of tags
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "os.h" #include "os.h"
#include "hash.h" #include "hash.h"
#include "tsdb.h"
#include "qinterpolation.h" #include "qinterpolation.h"
#include "qresultBuf.h" #include "qresultBuf.h"
#include "qsqlparser.h" #include "qsqlparser.h"
...@@ -89,7 +90,7 @@ typedef struct SColumnFilterElem { ...@@ -89,7 +90,7 @@ typedef struct SColumnFilterElem {
} SColumnFilterElem; } SColumnFilterElem;
typedef struct SSingleColumnFilterInfo { typedef struct SSingleColumnFilterInfo {
SColumnInfoData info; SColumnInfoData info;
int32_t numOfFilters; int32_t numOfFilters;
SColumnFilterElem* pFilters; SColumnFilterElem* pFilters;
void* pData; void* pData;
...@@ -108,8 +109,6 @@ typedef struct STableQueryInfo { ...@@ -108,8 +109,6 @@ typedef struct STableQueryInfo {
} STableQueryInfo; } STableQueryInfo;
typedef struct STableDataInfo { typedef struct STableDataInfo {
int32_t numOfBlocks;
int32_t start; // start block index
int32_t tableIndex; int32_t tableIndex;
int32_t groupIdx; // group id in table list int32_t groupIdx; // group id in table list
STableQueryInfo* pTableQInfo; STableQueryInfo* pTableQInfo;
...@@ -171,7 +170,7 @@ typedef struct SQInfo { ...@@ -171,7 +170,7 @@ typedef struct SQInfo {
int32_t pointsInterpo; int32_t pointsInterpo;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
sem_t dataReady; sem_t dataReady;
SArray* pTableList; // table id list STableGroupInfo groupInfo; // table id list
void* tsdb; void* tsdb;
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
...@@ -187,50 +186,7 @@ typedef struct SQInfo { ...@@ -187,50 +186,7 @@ typedef struct SQInfo {
*/ */
int32_t tableIndex; int32_t tableIndex;
int32_t numOfGroupResultPages; int32_t numOfGroupResultPages;
STableDataInfo* pTableDataInfo;
TSKEY* tsList; TSKEY* tsList;
} SQInfo; } SQInfo;
/**
* create the qinfo object before adding the query task to each tsdb query worker
*
* @param pReadMsg
* @param pQInfo
* @return
*/
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
/**
* destroy the query info struct
* @param pQInfo
*/
void qDestroyQueryInfo(SQInfo* pQInfo);
/**
* query on single table
* @param pReadMsg
*/
void qTableQuery(SQInfo* pQInfo);
/**
* wait for the query completed, and retrieve final results to client
* @param pQInfo
*/
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo);
/**
*
* @param pQInfo
* @param pRsp
* @return
*/
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
/**
*
* @param pQInfo
* @return
*/
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo);
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -544,7 +544,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty ...@@ -544,7 +544,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
iter = tSkipListCreateIterFromVal(pSkipList, (char*) &pCond->end->v.i64Key, type, TSDB_ORDER_DESC); iter = tSkipListCreateIterFromVal(pSkipList, (char*) &pCond->end->v.i64Key, type, TSDB_ORDER_DESC);
} }
__compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type); __compar_fn_t func = getComparFunc(pSkipList->keyInfo.type, type, 0);
if (pCond->start != NULL) { if (pCond->start != NULL) {
int32_t optr = pCond->start->optr; int32_t optr = pCond->start->optr;
...@@ -569,7 +569,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty ...@@ -569,7 +569,7 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
if (comp) { if (comp) {
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key); ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key);
assert(ret <= 0); assert(ret >= 0);
} }
if (ret == 0 && optr == TSDB_RELATION_GREATER) { if (ret == 0 && optr == TSDB_RELATION_GREATER) {
...@@ -595,8 +595,8 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty ...@@ -595,8 +595,8 @@ static void tQueryOnSkipList(SSkipList* pSkipList, SQueryCond* pCond, int32_t ty
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
if (comp) { if (comp) {
ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->start->v.i64Key); ret = func(SL_GET_NODE_KEY(pSkipList, pNode), &pCond->end->v.i64Key);
assert(ret >= 0); assert(ret <= 0);
} }
if (ret == 0 && optr == TSDB_RELATION_LESS) { if (ret == 0 && optr == TSDB_RELATION_LESS) {
......
此差异已折叠。
...@@ -214,6 +214,23 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { ...@@ -214,6 +214,23 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
} }
} }
int32_t tsdbGetTableTagVal(tsdb_repo_t* repo, STableId id, int32_t col, int16_t* type, int16_t* bytes, char** val) {
STsdbMeta* pMeta = tsdbGetMeta(repo);
STable* pTable = tsdbGetTableByUid(pMeta, id.uid);
STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn* pCol = schemaColAt(pSchema, col);
SDataRow row = (SDataRow)pTable->tagVal;
char* d = dataRowAt(row, TD_DATA_ROW_HEAD_SIZE);
*val = d;
*type = pCol->type;
*bytes = pCol->bytes;
return 0;
}
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
if (tsdbCheckTableCfg(pCfg) < 0) return -1; if (tsdbCheckTableCfg(pCfg) < 0) return -1;
......
...@@ -18,11 +18,10 @@ ...@@ -18,11 +18,10 @@
#include "talgo.h" #include "talgo.h"
#include "tlog.h" #include "tlog.h"
#include "tutil.h" #include "tutil.h"
#include "tcompare.h"
#include "../../../query/inc/qast.h" #include "../../../query/inc/qast.h" // todo move to common module
#include "../../../query/inc/qextbuffer.h" #include "../../../query/inc/tlosertree.h" // todo move to util module
#include "../../../query/inc/tlosertree.h"
#include "../../../query/inc/tsqlfunction.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbMain.h" #include "tsdbMain.h"
...@@ -143,7 +142,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { ...@@ -143,7 +142,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileListIndex = -1; pCompBlockLoadInfo->fileListIndex = -1;
} }
tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, SArray* groupList, SArray* pColumnInfo) { tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, SArray* pColumnInfo) {
// todo 1. filter not exist table // todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query // todo 2. add the reference count for each table that is involved in query
...@@ -157,22 +156,25 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S ...@@ -157,22 +156,25 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S
pQueryHandle->isFirstSlot = true; pQueryHandle->isFirstSlot = true;
pQueryHandle->cur.fid = -1; pQueryHandle->cur.fid = -1;
size_t size = taosArrayGetSize(groupList); size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
assert(size >= 1); assert(sizeOfGroup >= 1);
pQueryHandle->pTableCheckInfo = taosArrayInit(size, sizeof(STableCheckInfo)); pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
for (int32_t i = 0; i < size; ++i) {
SArray* group = *(SArray**)taosArrayGet(groupList, i); for (int32_t i = 0; i < sizeOfGroup; ++i) {
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
size_t gsize = taosArrayGetSize(group); size_t gsize = taosArrayGetSize(group);
assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) { for (int32_t j = 0; j < gsize; ++j) {
STable* pTable = *(STable**)taosArrayGet(group, j); SPair* d = (SPair*) taosArrayGet(group, j);
assert(pTable != NULL); assert(d->first != NULL);
STableCheckInfo info = { STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey, .lastKey = pQueryHandle->window.skey,
.tableId = pTable->tableId, .tableId = ((STable*) d->first)->tableId,
.pTableObj = pTable, .pTableObj = d->first,
}; };
taosArrayPush(pQueryHandle->pTableCheckInfo, &info); taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
...@@ -1143,7 +1145,7 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) { ...@@ -1143,7 +1145,7 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) {
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
STable* t = *(STable**)SL_GET_NODE_DATA(pNode); STable* t = *(STable**)SL_GET_NODE_DATA(pNode);
taosArrayPush(list, t); taosArrayPush(list, &t);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1167,7 +1169,7 @@ static void convertQueryResult(SArray* pRes, SArray* pTableList) { ...@@ -1167,7 +1169,7 @@ static void convertQueryResult(SArray* pRes, SArray* pTableList) {
size_t size = taosArrayGetSize(pTableList); size_t size = taosArrayGetSize(pTableList);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STable* pTable = taosArrayGetP(pTableList, i); STable* pTable = taosArrayGetP(pTableList, i);
taosArrayPush(pRes, &pTable->tableId); taosArrayPush(pRes, &pTable);
} }
} }
...@@ -1181,24 +1183,20 @@ static void destroyHelper(void* param) { ...@@ -1181,24 +1183,20 @@ static void destroyHelper(void* param) {
free(param); free(param);
} }
static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index, int32_t* offset) { static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index) {
*index = 0; *index = 0;
*offset = 0;
// filter on table name(TBNAME) // filter on table name(TBNAME)
if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) { if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
*index = TSDB_TBNAME_COLUMN_INDEX; *index = TSDB_TBNAME_COLUMN_INDEX;
*offset = TSDB_TBNAME_COLUMN_INDEX;
return; return;
} }
while ((*index) < pSupporter->numOfTags) { while ((*index) < pSupporter->numOfTags) {
if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes && if (pSupporter->pTagSchema[*index].bytes == pSchema->bytes &&
pSupporter->pTagSchema[*index].type == pSchema->type && pSupporter->pTagSchema[*index].type == pSchema->type &&
pSupporter->pTagSchema[*index].colId == pSchema->colId) { pSupporter->pTagSchema[*index].colId == pSchema->colId) {
break; break;
} else {
(*offset) += pSupporter->pTagSchema[(*index)++].bytes;
} }
} }
} }
...@@ -1219,15 +1217,14 @@ void filterPrepare(void* expr, void* param) { ...@@ -1219,15 +1217,14 @@ void filterPrepare(void* expr, void* param) {
tVariant* pCond = pExpr->_node.pRight->pVal; tVariant* pCond = pExpr->_node.pRight->pVal;
SSchema* pSchema = pExpr->_node.pLeft->pSchema; SSchema* pSchema = pExpr->_node.pLeft->pSchema;
getTagColumnInfo(pSupporter, pSchema, &i, &offset); getTagColumnInfo(pSupporter, pSchema, &i);
assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX)); assert((i >= 0 && i < TSDB_MAX_TAGS) || (i == TSDB_TBNAME_COLUMN_INDEX));
assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX)); assert((offset >= 0 && offset < TSDB_MAX_TAGS_LEN) || (offset == TSDB_TBNAME_COLUMN_INDEX));
pInfo->sch = *pSchema; pInfo->sch = *pSchema;
pInfo->colIndex = i; pInfo->colIndex = i;
pInfo->optr = pExpr->_node.optr; pInfo->optr = pExpr->_node.optr;
pInfo->offset = offset; pInfo->compare = getComparFunc(pSchema->type, pCond->nType, pInfo->optr);
// pInfo->compare = getFilterComparator(pSchema->type, pCond->nType, pInfo->optr);
tVariantAssign(&pInfo->q, pCond); tVariantAssign(&pInfo->q, pCond);
tVariantTypeSetType(&pInfo->q, pInfo->sch.type); tVariantTypeSetType(&pInfo->q, pInfo->sch.type);
...@@ -1306,26 +1303,33 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { ...@@ -1306,26 +1303,33 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
} }
void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) { void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
SArray* g = taosArrayInit(16, POINTER_BYTES); SArray* g = taosArrayInit(16, sizeof(SPair));
taosArrayPush(g, &pTables[0]);
SPair p = {.first = pTables[0]};
taosArrayPush(g, &p);
for (int32_t i = 1; i < numOfTables; ++i) { for (int32_t i = 1; i < numOfTables; ++i) {
int32_t ret = compareFn(&pTables[i - 1], &pTables[i], pSupp); int32_t ret = compareFn(&pTables[i - 1], &pTables[i], pSupp);
assert(ret == 0 || ret == -1); assert(ret == 0 || ret == -1);
if (ret == 0) { if (ret == 0) {
taosArrayPush(g, &pTables[i]); SPair p1 = {.first = pTables[i]};
taosArrayPush(g, &p1);
} else { } else {
taosArrayPush(pGroups, &g); // current group is ended, start a new group taosArrayPush(pGroups, &g); // current group is ended, start a new group
g = taosArrayInit(16, POINTER_BYTES); g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTables[i]);
SPair p1 = {.first = pTables[i]};
taosArrayPush(g, &p1);
} }
} }
taosArrayPush(pGroups, &g);
} }
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) { SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
assert(pTableList != NULL && taosArrayGetSize(pTableList) > 0); assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
size_t size = taosArrayGetSize(pTableList); size_t size = taosArrayGetSize(pTableList);
...@@ -1335,7 +1339,17 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -1335,7 +1339,17 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
} }
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
taosArrayPush(pTableGroup, pTableList); size_t num = taosArrayGetSize(pTableList);
SArray* sa = taosArrayInit(num, sizeof(SPair));
for(int32_t i = 0; i < num; ++i) {
STable* pTable = taosArrayGetP(pTableList, i);
SPair p = {.first = pTable};
taosArrayPush(sa, &p);
}
taosArrayPush(pTableGroup, &sa);
pTrace("all %d tables belong to one group", size); pTrace("all %d tables belong to one group", size);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
...@@ -1362,7 +1376,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -1362,7 +1376,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
bool tSkipListNodeFilterCallback(const void* pNode, void* param) { bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*)param; tQueryInfo* pInfo = (tQueryInfo*)param;
STable* pTable = (STable*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = dataRowTuple(pTable->tagVal); // todo not only the first column char* val = dataRowTuple(pTable->tagVal); // todo not only the first column
int8_t type = pInfo->sch.type; int8_t type = pInfo->sch.type;
...@@ -1419,7 +1433,8 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) ...@@ -1419,7 +1433,8 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
SExprTreeSupporter s = {.pTagSchema = schema, .numOfTags = schemaNCols(pSTable->tagSchema)}; SExprTreeSupporter s = {.pTagSchema = schema, .numOfTags = schemaNCols(pSTable->tagSchema)};
SBinaryFilterSupp supp = { SBinaryFilterSupp supp = {
.fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = filterPrepare, .pExtInfo = &s}; .fp = (__result_filter_fn_t)tSkipListNodeFilterCallback, .setupInfoFn = filterPrepare, .pExtInfo = &s,
};
SArray* pTableList = taosArrayInit(8, POINTER_BYTES); SArray* pTableList = taosArrayInit(8, POINTER_BYTES);
...@@ -1430,7 +1445,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) ...@@ -1430,7 +1445,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, SArray** pGroupList, int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) { SColIndex* pColIndex, int32_t numOfCols) {
STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
...@@ -1448,9 +1463,9 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size ...@@ -1448,9 +1463,9 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
taosArrayDestroy(res); taosArrayDestroy(res);
return ret; return ret;
} }
*pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); pGroupInfo->numOfTables = taosArrayGetSize(res);
taosArrayDestroy(res); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
return ret; return ret;
} }
...@@ -1465,25 +1480,27 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size ...@@ -1465,25 +1480,27 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
} }
doQueryTableList(pSTable, res, pExprNode); doQueryTableList(pSTable, res, pExprNode);
*pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
taosArrayDestroy(res);
return ret; return ret;
} }
int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, SArray** pGroupList) { int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pGroupInfo) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) { if (pTable == NULL) {
return TSDB_CODE_INVALID_TABLE_ID; return TSDB_CODE_INVALID_TABLE_ID;
} }
//todo assert table type, add the table ref count //todo assert table type, add the table ref count
pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
*pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, POINTER_BYTES); SArray* group = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(group, &pTable); taosArrayPush(group, &pTable);
taosArrayPush(*pGroupList, &group); taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -34,31 +34,13 @@ typedef struct SPatternCompareInfo { ...@@ -34,31 +34,13 @@ typedef struct SPatternCompareInfo {
char matchOne; // symbol for match one wildcard, default: '_' char matchOne; // symbol for match one wildcard, default: '_'
} SPatternCompareInfo; } SPatternCompareInfo;
int32_t compareInt32Val(const void *pLeft, const void *pRight);
int32_t compareInt64Val(const void *pLeft, const void *pRight);
int32_t compareInt16Val(const void *pLeft, const void *pRight);
int32_t compareInt8Val(const void *pLeft, const void *pRight);
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight);
int32_t compareDoubleIntVal(const void *pLeft, const void *pRight);
int32_t compareDoubleVal(const void *pLeft, const void *pRight);
int32_t compareStrVal(const void *pLeft, const void *pRight);
int32_t compareWStrVal(const void *pLeft, const void *pRight);
int patternMatch(const char *zPattern, const char *zString, size_t size, const SPatternCompareInfo *pInfo); int patternMatch(const char *zPattern, const char *zString, size_t size, const SPatternCompareInfo *pInfo);
int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo); int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo);
__compar_fn_t getKeyComparFunc(int32_t keyType); __compar_fn_t getKeyComparFunc(int32_t keyType);
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType); __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -107,6 +107,11 @@ extern "C" { ...@@ -107,6 +107,11 @@ extern "C" {
#define POW2(x) ((x) * (x)) #define POW2(x) ((x) * (x))
typedef struct SPair {
void* first;
void* sec;
} SPair;
int32_t strdequote(char *src); int32_t strdequote(char *src);
void strtrim(char *src); void strtrim(char *src);
......
...@@ -40,25 +40,23 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) { ...@@ -40,25 +40,23 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) {
} }
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) { int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) {
// int64_t lhs = ((SSkipListKey *)pLeft)->i64Key; int64_t lhs = GET_INT64_VAL(pLeft);
// double rhs = ((SSkipListKey *)pRight)->dKey; double rhs = GET_DOUBLE_VAL(pRight);
// if (fabs(lhs - rhs) < FLT_EPSILON) { if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0; return 0;
// } else { } else {
// return (lhs > rhs) ? 1 : -1; return (lhs > rhs) ? 1 : -1;
// } }
return 0;
} }
int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) { int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) {
// double lhs = ((SSkipListKey *)pLeft)->dKey; double lhs = GET_DOUBLE_VAL(pLeft);
// int64_t rhs = ((SSkipListKey *)pRight)->i64Key; int64_t rhs = GET_INT64_VAL(pRight);
// if (fabs(lhs - rhs) < FLT_EPSILON) { if (fabs(lhs - rhs) < FLT_EPSILON) {
// return 0; return 0;
// } else { } else {
// return (lhs > rhs) ? 1 : -1; return (lhs > rhs) ? 1 : -1;
// } }
return 0;
} }
int32_t compareDoubleVal(const void *pLeft, const void *pRight) { int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
...@@ -241,7 +239,8 @@ static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void* ...@@ -241,7 +239,8 @@ static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void*
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
} }
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) { // todo promote the type definition before the comparsion
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType, int32_t optr) {
__compar_fn_t comparFn = NULL; __compar_fn_t comparFn = NULL;
switch (type) { switch (type) {
...@@ -250,10 +249,15 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) { ...@@ -250,10 +249,15 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: { case TSDB_DATA_TYPE_TIMESTAMP: {
// assert(type == filterDataType);
if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) { if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) {
comparFn = compareInt64Val; comparFn = compareInt64Val;
break; } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
comparFn = compareIntDoubleVal;
} }
break;
} }
case TSDB_DATA_TYPE_BOOL: { case TSDB_DATA_TYPE_BOOL: {
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
...@@ -265,22 +269,37 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) { ...@@ -265,22 +269,37 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
} }
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) { if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
// comparFn = compareDoubleIntVal; comparFn = compareDoubleIntVal;
// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) { } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
// comparFn = compareDoubleVal;
// }
if (filterDataType == TSDB_DATA_TYPE_DOUBLE) {
comparFn = compareDoubleVal; comparFn = compareDoubleVal;
} }
break; break;
} }
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY: {
comparFn = compareStrVal; assert(filterDataType == TSDB_DATA_TYPE_BINARY);
if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */
comparFn = compareStrPatternComp;
} else { /* normal relational comparFn */
comparFn = compareStrVal;
}
break; break;
case TSDB_DATA_TYPE_NCHAR: }
comparFn = compareWStrVal;
case TSDB_DATA_TYPE_NCHAR: {
assert(filterDataType == TSDB_DATA_TYPE_NCHAR);
if (optr == TSDB_RELATION_LIKE) {
comparFn = compareWStrPatternComp;
} else {
comparFn = compareWStrVal;
}
break; break;
}
default: default:
comparFn = compareInt32Val; comparFn = compareInt32Val;
break; break;
......
...@@ -110,12 +110,7 @@ short tsDaysPerFile = 10; ...@@ -110,12 +110,7 @@ short tsDaysPerFile = 10;
int tsDaysToKeep = 3650; int tsDaysToKeep = 3650;
int tsReplications = TSDB_REPLICA_MIN_NUM; int tsReplications = TSDB_REPLICA_MIN_NUM;
#ifdef _MPEER
int tsNumOfMPeers = 3; int tsNumOfMPeers = 3;
#else
int tsNumOfMPeers = 1;
#endif
int tsMaxShellConns = 2000; int tsMaxShellConns = 2000;
int tsMaxTables = 100000; int tsMaxTables = 100000;
...@@ -556,7 +551,7 @@ static void doInitGlobalConfig() { ...@@ -556,7 +551,7 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "tblocks", &tsNumOfBlocksPerMeter, TSDB_CFG_VTYPE_SHORT, tsInitConfigOption(cfg++, "tblocks", &tsNumOfBlocksPerMeter, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
32, 4096, 0, TSDB_CFG_UTYPE_NONE); 32, 4096, 0, TSDB_CFG_UTYPE_NONE);
#ifdef _MPEER #ifdef _SYNC
tsInitConfigOption(cfg++, "numOfMPeers", &tsNumOfMPeers, TSDB_CFG_VTYPE_INT, tsInitConfigOption(cfg++, "numOfMPeers", &tsNumOfMPeers, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER,
1, 3, 0, TSDB_CFG_UTYPE_NONE); 1, 3, 0, TSDB_CFG_UTYPE_NONE);
......
...@@ -308,7 +308,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) { ...@@ -308,7 +308,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) {
pSkipList->state.queryCount++; pSkipList->state.queryCount++;
#endif #endif
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, keyType); __compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, keyType, 0);
int32_t ret = -1; int32_t ret = -1;
for (int32_t i = sLevel; i >= 0; --i) { for (int32_t i = sLevel; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i); SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, i);
...@@ -372,7 +372,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* ...@@ -372,7 +372,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
int32_t ret = -1; int32_t ret = -1;
__compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, type); __compar_fn_t filterComparFn = getComparFunc(pSkipList->keyInfo.type, type, 0);
SSkipListNode* pNode = pSkipList->pHead; SSkipListNode* pNode = pSkipList->pHead;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) { for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include "dataformat.h" #include "dataformat.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "queryExecutor.h" #include "query.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
...@@ -54,7 +54,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -54,7 +54,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SQInfo* pQInfo = NULL; qinfo_t pQInfo = NULL;
if (contLen != 0) { if (contLen != 0) {
void* tsdb = vnodeGetTsdb(pVnode); void* tsdb = vnodeGetTsdb(pVnode);
pRet->code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); pRet->code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);} #define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);}
typedef struct { typedef struct {
uint64_t version;
int fd; int fd;
int keep; int keep;
int level; int level;
...@@ -50,7 +51,7 @@ int wDebugFlag = 135; ...@@ -50,7 +51,7 @@ int wDebugFlag = 135;
static uint32_t walSignature = 0xFAFBFDFE; static uint32_t walSignature = 0xFAFBFDFE;
static int walHandleExistingFiles(const char *path); static int walHandleExistingFiles(const char *path);
static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp); static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
static int walRemoveWalFiles(const char *path); static int walRemoveWalFiles(const char *path);
void *walOpen(const char *path, const SWalCfg *pCfg) { void *walOpen(const char *path, const SWalCfg *pCfg) {
...@@ -68,6 +69,8 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { ...@@ -68,6 +69,8 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
if (access(path, F_OK) != 0) mkdir(path, 0755); if (access(path, F_OK) != 0) mkdir(path, 0755);
if (pCfg->keep == 1) return pWal;
if (walHandleExistingFiles(path) == 0) if (walHandleExistingFiles(path) == 0)
walRenew(pWal); walRenew(pWal);
...@@ -154,6 +157,7 @@ int walWrite(void *handle, SWalHead *pHead) { ...@@ -154,6 +157,7 @@ int walWrite(void *handle, SWalHead *pHead) {
// no wal // no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->version) return 0;
pHead->signature = walSignature; pHead->signature = walSignature;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
...@@ -162,7 +166,9 @@ int walWrite(void *handle, SWalHead *pHead) { ...@@ -162,7 +166,9 @@ int walWrite(void *handle, SWalHead *pHead) {
if(write(pWal->fd, pHead, contLen) != contLen) { if(write(pWal->fd, pHead, contLen) != contLen) {
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
code = -1; code = -1;
} } else {
pWal->version = pHead->version;
}
return code; return code;
} }
...@@ -184,7 +190,10 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) ...@@ -184,7 +190,10 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
int plen = strlen(walPrefix); int plen = strlen(walPrefix);
char opath[TSDB_FILENAME_LEN+5]; char opath[TSDB_FILENAME_LEN+5];
sprintf(opath, "%s/old", pWal->path);
int slen = sprintf(opath, "%s", pWal->path);
if ( pWal->keep == 0)
strcpy(opath+slen, "/old");
// is there old directory? // is there old directory?
if (access(opath, F_OK)) return 0; if (access(opath, F_OK)) return 0;
...@@ -199,6 +208,8 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) ...@@ -199,6 +208,8 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
} }
} }
if (count == 0) return 0;
if ( count != (maxId-minId+1) ) { if ( count != (maxId-minId+1) ) {
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId); wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
code = -1; code = -1;
...@@ -206,17 +217,29 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) ...@@ -206,17 +217,29 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
wTrace("wal:%s, %d files will be restored", opath, count); wTrace("wal:%s, %d files will be restored", opath, count);
for (index = minId; index<=maxId; ++index) { for (index = minId; index<=maxId; ++index) {
sprintf(pWal->name, "%s/old/%s%d", pWal->path, walPrefix, index); sprintf(pWal->name, "%s/%s%d", opath, walPrefix, index);
code = walRestoreWalFile(pWal->name, pVnode, writeFp); code = walRestoreWalFile(pWal, pVnode, writeFp);
if (code < 0) break; if (code < 0) break;
} }
} }
if (code == 0) { if (code == 0) {
code = walRemoveWalFiles(opath); if (pWal->keep == 0) {
if (code == 0) { code = walRemoveWalFiles(opath);
if (remove(opath) < 0) { if (code == 0) {
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); if (remove(opath) < 0) {
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
code = -1;
}
}
} else {
// open the existing WAL file in append mode
pWal->num = count;
pWal->id = maxId;
sprintf(pWal->name, "%s/%s%d", opath, walPrefix, maxId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) {
wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno));
code = -1; code = -1;
} }
} }
...@@ -252,8 +275,9 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { ...@@ -252,8 +275,9 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
return code; return code;
} }
static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp) { static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
int code = 0; int code = 0;
char *name = pWal->name;
char *buffer = malloc(1024000); // size for one record char *buffer = malloc(1024000); // size for one record
if (buffer == NULL) return -1; if (buffer == NULL) return -1;
...@@ -289,10 +313,11 @@ static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp) ...@@ -289,10 +313,11 @@ static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp)
break; break;
} }
// write into queue if (pWal->keep) pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
} }
close(fd);
free(buffer); free(buffer);
return code; return code;
...@@ -365,4 +390,3 @@ static int walRemoveWalFiles(const char *path) { ...@@ -365,4 +390,3 @@ static int walRemoveWalFiles(const char *path) {
return code; return code;
} }
...@@ -29,8 +29,6 @@ int writeToQueue(void *pVnode, void *data, int type) { ...@@ -29,8 +29,6 @@ int writeToQueue(void *pVnode, void *data, int type) {
ver = pHead->version; ver = pHead->version;
walWrite(pWal, pHead); walWrite(pWal, pHead);
free(pHead);
return 0; return 0;
} }
...@@ -42,6 +40,7 @@ int main(int argc, char *argv[]) { ...@@ -42,6 +40,7 @@ int main(int argc, char *argv[]) {
int total = 5; int total = 5;
int rows = 10000; int rows = 10000;
int size = 128; int size = 128;
int keep = 0;
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
...@@ -52,6 +51,8 @@ int main(int argc, char *argv[]) { ...@@ -52,6 +51,8 @@ int main(int argc, char *argv[]) {
level = atoi(argv[++i]); level = atoi(argv[++i]);
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) { } else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
rows = atoi(argv[++i]); rows = atoi(argv[++i]);
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
keep = atoi(argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) { } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
total = atoi(argv[++i]); total = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) { } else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
...@@ -67,6 +68,7 @@ int main(int argc, char *argv[]) { ...@@ -67,6 +68,7 @@ int main(int argc, char *argv[]) {
printf(" [-l level]: log level, default is:%d\n", level); printf(" [-l level]: log level, default is:%d\n", level);
printf(" [-t total]: total wal files, default is:%d\n", total); printf(" [-t total]: total wal files, default is:%d\n", total);
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows); printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
printf(" [-k keep]: keep the wal after closing, default is:%d\n", keep);
printf(" [-v version]: initial version, default is:%ld\n", ver); printf(" [-v version]: initial version, default is:%ld\n", ver);
printf(" [-d debugFlag]: debug flag, default:%d\n", ddebugFlag); printf(" [-d debugFlag]: debug flag, default:%d\n", ddebugFlag);
printf(" [-h help]: print out this help\n\n"); printf(" [-h help]: print out this help\n\n");
...@@ -79,6 +81,7 @@ int main(int argc, char *argv[]) { ...@@ -79,6 +81,7 @@ int main(int argc, char *argv[]) {
SWalCfg walCfg; SWalCfg walCfg;
walCfg.commitLog = level; walCfg.commitLog = level;
walCfg.wals = max; walCfg.wals = max;
walCfg.keep = keep;
pWal = walOpen(path, &walCfg); pWal = walOpen(path, &walCfg);
if (pWal == NULL) { if (pWal == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册