未验证 提交 9b6c8a30 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1624 from taosdata/feature/query

Feature/query
...@@ -28,12 +28,12 @@ extern "C" { ...@@ -28,12 +28,12 @@ extern "C" {
#include "taos.h" #include "taos.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tarray.h"
#include "tglobalcfg.h" #include "tglobalcfg.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tutil.h" #include "tutil.h"
#include "tarray.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows) #define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
...@@ -61,10 +61,8 @@ typedef struct STableMeta { ...@@ -61,10 +61,8 @@ typedef struct STableMeta {
//super table if it is created according to super table, otherwise, tableInfo is used //super table if it is created according to super table, otherwise, tableInfo is used
union { struct STableMeta* pSTable; STableComInfo tableInfo; }; union { struct STableMeta* pSTable; STableComInfo tableInfo; };
uint8_t tableType; uint8_t tableType;
int8_t numOfVpeers;
int16_t sversion; int16_t sversion;
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; SCMVgroupInfo vgroupInfo;
int32_t vgId; // virtual group id, which current table belongs to
int32_t sid; // the index of one table in a virtual node int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table uint64_t uid; // unique id of a table
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
...@@ -77,7 +75,7 @@ typedef struct STableMetaInfo { ...@@ -77,7 +75,7 @@ typedef struct STableMetaInfo {
* 1. keep the vnode index during the multi-vnode super table projection query * 1. keep the vnode index during the multi-vnode super table projection query
* 2. keep the vnode index for multi-vnode insertion * 2. keep the vnode index for multi-vnode insertion
*/ */
int32_t dnodeIndex; int32_t vgroupIndex;
char name[TSDB_TABLE_ID_LEN]; // (super) table name char name[TSDB_TABLE_ID_LEN]; // (super) table name
int16_t numOfTags; // total required tags in query, including groupby tags int16_t numOfTags; // total required tags in query, including groupby tags
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
......
...@@ -45,9 +45,10 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -45,9 +45,10 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_REPLICA_MAX_NUM;
pSql->fp = fp; pSql->fp = fp;
pSql->param = param;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("failed to malloc payload"); tscError("failed to malloc payload");
...@@ -406,31 +407,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -406,31 +407,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (pSql->fp == (void *)1) {
pSql->fp = NULL;
if (code != 0) {
pRes->code = code;
tscTrace("%p failed to renew tableMeta", pSql);
// tsem_post(&pSql->rspSem);
} else {
tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pTableMetaInfo->pTableMeta == NULL);
tscGetTableMeta(pSql, pTableMetaInfo);
code = tscSendMsgToServer(pSql);
if (code != 0) {
pRes->code = code;
// tsem_post(&pSql->rspSem);
}
}
return;
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRes->code = code; pRes->code = code;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
...@@ -443,12 +419,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -443,12 +419,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) { if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->dnodeIndex >= 0 && pSql->param != NULL); assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSqlObj; SSqlObj * pParObj = trs->pParentSqlObj;
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->dnodeIndex && assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0); tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
tscTrace("%p get metricMeta during super table query successfully", pSql); tscTrace("%p get metricMeta during super table query successfully", pSql);
......
...@@ -699,7 +699,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st ...@@ -699,7 +699,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st
SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData); SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
dataBuf->vgId = pTableMeta->vgId; dataBuf->vgId = pTableMeta->vgroupInfo.vgId;
dataBuf->numOfTables = 1; dataBuf->numOfTables = 1;
/* /*
......
...@@ -408,7 +408,7 @@ static int insertStmtReset(STscStmt* pStmt) { ...@@ -408,7 +408,7 @@ static int insertStmtReset(STscStmt* pStmt) {
pCmd->batchSize = 0; pCmd->batchSize = 0;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
pTableMetaInfo->dnodeIndex = 0; pTableMetaInfo->vgroupIndex = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -438,7 +438,7 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -438,7 +438,7 @@ static int insertStmtExecute(STscStmt* stmt) {
} }
// set the next sent data vnode index in data block arraylist // set the next sent data vnode index in data block arraylist
pTableMetaInfo->dnodeIndex = 1; pTableMetaInfo->vgroupIndex = 1;
} else { } else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
} }
......
...@@ -168,10 +168,8 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size ...@@ -168,10 +168,8 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->vgId = pTableMetaMsg->vgId; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
pTableMeta->numOfVpeers = pTableMetaMsg->numOfVpeers;
memcpy(pTableMeta->vpeerDesc, pTableMetaMsg->vpeerDesc, sizeof(SVnodeDesc) * pTableMeta->numOfVpeers);
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize); memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
......
...@@ -636,7 +636,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr ...@@ -636,7 +636,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity); pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
size_t numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes; size_t numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups;
for (int32_t i = 0; i < numOfSubs; ++i) { for (int32_t i = 0; i < numOfSubs; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
......
...@@ -47,12 +47,12 @@ static int32_t minMsgSize() { return tsRpcHeadSize + 100; } ...@@ -47,12 +47,12 @@ static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static void tscSetDnodeIpList(SSqlObj* pSql, STableMeta* pTableMeta) { static void tscSetDnodeIpList(SSqlObj* pSql, STableMeta* pTableMeta) {
SRpcIpSet* pIpList = &pSql->ipList; SRpcIpSet* pIpList = &pSql->ipList;
pIpList->numOfIps = pTableMeta->numOfVpeers; pIpList->numOfIps = pTableMeta->vgroupInfo.numOfIps;
pIpList->port = tsDnodeShellPort; pIpList->port = tsDnodeShellPort;
pIpList->inUse = 0; pIpList->inUse = 0;
for(int32_t i = 0; i < pTableMeta->numOfVpeers; ++i) { for(int32_t i = 0; i < pTableMeta->vgroupInfo.numOfIps; ++i) {
pIpList->ip[i] = pTableMeta->vpeerDesc[i].ip; pIpList->ip[i] = pTableMeta->vgroupInfo.ipAddr[i].ip;
} }
} }
...@@ -270,7 +270,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -270,7 +270,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
} else { } else {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; // todo move away
pSql->res.code = rpcMsg->code; // keep the previous error code pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) { if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
...@@ -327,7 +326,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -327,7 +326,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
* There is not response callback function for submit response. * There is not response callback function for submit response.
* The actual inserted number of points is the first number. * The actual inserted number of points is the first number.
*/ */
if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) { if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp; SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
pMsg->code = htonl(pMsg->code); pMsg->code = htonl(pMsg->code);
pMsg->numOfRows = htonl(pMsg->numOfRows); pMsg->numOfRows = htonl(pMsg->numOfRows);
...@@ -512,12 +511,12 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -512,12 +511,12 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// todo valid the vgroupId at the client side // todo valid the vgroupId at the client side
if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) { if (UTIL_TABLE_IS_SUPERTABLE(pQueryInfo->pTableMetaInfo[0])) {
SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList; SVgroupsInfo* pVgroupInfo = pQueryInfo->pTableMetaInfo[0]->vgroupList;
assert(pVgroupInfo->dnodeVgroups->numOfVgroups == 1); // todo fix me assert(pVgroupInfo->numOfVgroups == 1); // todo fix me
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->dnodeVgroups[0].vgId[0]); pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[0].vgId);
} else { } else {
STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta; STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[0]->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
} }
pMsg += sizeof(SRetrieveTableMsg); pMsg += sizeof(SRetrieveTableMsg);
...@@ -543,8 +542,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -543,8 +542,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(SMsgDesc); pMsg += sizeof(SMsgDesc);
SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
int32_t vgId = pTableMeta->vgroupInfo.vgId;
pShellMsg->header.vgId = htonl(pTableMeta->vgId); pShellMsg->header.vgId = htonl(vgId);
pShellMsg->header.contLen = htonl(size); pShellMsg->header.contLen = htonl(size);
pShellMsg->length = pShellMsg->header.contLen; pShellMsg->length = pShellMsg->header.contLen;
...@@ -554,7 +554,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -554,7 +554,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscSetDnodeIpList(pSql, pTableMeta); tscSetDnodeIpList(pSql, pTableMeta);
tscTrace("%p build submit msg, vgId:%d numOfVnodes:%d", pSql, pTableMeta->vgId, htonl(pMsgDesc->numOfVnodes)); tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -580,7 +580,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { ...@@ -580,7 +580,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
#if 0 #if 0
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids; int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(STableIdInfo)) * pVnodeSidList->numOfSids;
int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg); int32_t outputColumnSize = pQueryInfo->exprsInfo.numOfExprs * sizeof(SSqlFuncExprMsg);
...@@ -648,12 +648,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -648,12 +648,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
numOfTables = 1; numOfTables = 1;
tscSetDnodeIpList(pSql, pTableMeta); tscSetDnodeIpList(pSql, pTableMeta);
pQueryMsg->head.vgId = htonl(pTableMeta->vgId); pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
} else { // query super table } else { // query super table
if (pTableMetaInfo->dnodeIndex < 0) { if (pTableMetaInfo->vgroupIndex < 0) {
tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->dnodeIndex); tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vgroupIndex);
return -1; return -1;
} }
...@@ -662,11 +662,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -662,11 +662,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSql->ipList.inUse = 0; pSql->ipList.inUse = 0;
// todo extract method // todo extract method
STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[pTableMetaInfo->dnodeIndex]; SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[pTableMetaInfo->vgroupIndex];
pSql->ipList.ip[0] = pVgroupInfo->ipAddr.ip; pSql->ipList.ip[0] = pVgroupInfo->ipAddr[0].ip;
#if 0 #if 0
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->dnodeIndex); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vgroupIndex);
uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode; uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;
numOfTables = pVnodeSidList->numOfSids; numOfTables = pVnodeSidList->numOfSids;
...@@ -676,10 +676,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -676,10 +676,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
#endif #endif
tscTrace("%p query on super table, numOfVgroup:%d, dnodeIndex:%d", pSql, pVgroupInfo->numOfVgroups, tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups,
pTableMetaInfo->dnodeIndex); pTableMetaInfo->vgroupIndex);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId[0]); pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
numOfTables = 1; numOfTables = 1;
} }
...@@ -857,7 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -857,7 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
if (pQueryInfo->tsBuf != NULL) { if (pQueryInfo->tsBuf != NULL) {
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->dnodeIndex); STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pQueryInfo->tsBuf, pTableMetaInfo->vgroupIndex);
assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent assert(QUERY_IS_JOIN_QUERY(pQueryInfo->type) && pBlockInfo != NULL); // this query should not be sent
// todo refactor // todo refactor
...@@ -1828,13 +1828,15 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1828,13 +1828,15 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pMetaMsg->sid = htonl(pMetaMsg->sid); pMetaMsg->sid = htonl(pMetaMsg->sid);
pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->sversion = htons(pMetaMsg->sversion);
pMetaMsg->vgId = htonl(pMetaMsg->vgId);
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
pMetaMsg->uid = htobe64(pMetaMsg->uid); pMetaMsg->uid = htobe64(pMetaMsg->uid);
pMetaMsg->contLen = htons(pMetaMsg->contLen); pMetaMsg->contLen = htons(pMetaMsg->contLen);
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
if (pMetaMsg->sid < 0 || pMetaMsg->vgId < 0) { if (pMetaMsg->sid < 0 || pMetaMsg->vgroup.numOfIps < 0) {
tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgId, pMetaMsg->sid); tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgroup.numOfIps, pMetaMsg->sid);
return TSDB_CODE_INVALID_VALUE; return TSDB_CODE_INVALID_VALUE;
} }
...@@ -1848,9 +1850,11 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1848,9 +1850,11 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
return TSDB_CODE_INVALID_VALUE; return TSDB_CODE_INVALID_VALUE;
} }
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
pMetaMsg->vpeerDesc[i].ip = htonl(pMetaMsg->vpeerDesc[i].ip); pMetaMsg->vgroup.ipAddr[i].ip = htonl(pMetaMsg->vgroup.ipAddr[i].ip);
pMetaMsg->vpeerDesc[i].dnodeId = htonl(pMetaMsg->vpeerDesc[i].dnodeId); pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
assert(pMetaMsg->vgroup.ipAddr[i].ip != 0);
} }
SSchema* pSchema = pMetaMsg->schema; SSchema* pSchema = pMetaMsg->schema;
...@@ -1859,6 +1863,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1859,6 +1863,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
for (int i = 0; i < numOfTotalCols; ++i) { for (int i = 0; i < numOfTotalCols; ++i) {
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = htons(pSchema->colId); pSchema->colId = htons(pSchema->colId);
assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
pSchema++; pSchema++;
} }
...@@ -1899,9 +1905,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1899,9 +1905,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
* |...... 1B 1B 4B * |...... 1B 1B 4B
**/ **/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
// uint8_t ieType;
// int32_t totalNum;
// int32_t i;
#if 0 #if 0
char *rsp = pSql->res.pRsp; char *rsp = pSql->res.pRsp;
...@@ -1958,7 +1961,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { ...@@ -1958,7 +1961,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
// return TSDB_CODE_OTHERS; // return TSDB_CODE_OTHERS;
// } // }
// //
// for (int j = 0; j < TSDB_VNODES_SUPPORT; ++j) { // for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
// pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode); // pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
// } // }
// //
...@@ -2116,7 +2119,7 @@ _error_clean: ...@@ -2116,7 +2119,7 @@ _error_clean:
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp; SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pRes->pRsp;
pStableVgroup->numOfDnodes = htonl(pStableVgroup->numOfDnodes); pStableVgroup->numOfVgroups = htonl(pStableVgroup->numOfVgroups);
// master sqlObj locates in param // master sqlObj locates in param
SSqlObj* parent = pSql->param; SSqlObj* parent = pSql->param;
...@@ -2128,14 +2131,14 @@ _error_clean: ...@@ -2128,14 +2131,14 @@ _error_clean:
pInfo->vgroupList = malloc(pRes->rspLen); pInfo->vgroupList = malloc(pRes->rspLen);
memcpy(pInfo->vgroupList, pStableVgroup, pRes->rspLen); memcpy(pInfo->vgroupList, pStableVgroup, pRes->rspLen);
for(int32_t i = 0; i < pInfo->vgroupList->numOfDnodes; ++i) { for(int32_t i = 0; i < pInfo->vgroupList->numOfVgroups; ++i) {
STableDnodeVgroupInfo* pVgroups = &pInfo->vgroupList->dnodeVgroups[i]; SCMVgroupInfo* pVgroups = &pInfo->vgroupList->vgroups[i];
pVgroups->numOfVgroups = htonl(pVgroups->numOfVgroups); pVgroups->numOfIps = htonl(pVgroups->numOfIps);
pVgroups->ipAddr.ip = htonl(pVgroups->ipAddr.ip); pVgroups->vgId = htonl(pVgroups->vgId);
pVgroups->ipAddr.port = htons(pVgroups->ipAddr.port);
for(int32_t j = 0; j < pVgroups->numOfVgroups; ++j) { for(int32_t j = 0; j < tListLen(pVgroups->ipAddr); ++j) {
pVgroups->vgId[j] = htonl(pVgroups->vgId[j]); pVgroups->ipAddr[j].ip = htonl(pVgroups->ipAddr[j].ip);
pVgroups->ipAddr[j].port = htons(pVgroups->ipAddr[j].port);
} }
} }
......
...@@ -129,6 +129,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -129,6 +129,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->signature = pSql; pSql->signature = pSql;
pSql->maxRetry = TSDB_REPLICA_MAX_NUM;
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pObj->pSql = pSql; pObj->pSql = pSql;
......
...@@ -382,7 +382,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -382,7 +382,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type; pQueryInfo->type = type;
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->dnodeIndex = 0; tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
} }
tscDoQuery(pSql); tscDoQuery(pSql);
......
...@@ -341,8 +341,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { ...@@ -341,8 +341,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, dnodeIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
} }
...@@ -457,7 +457,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -457,7 +457,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->dnodeIndex); tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestory(pBuf); tsBufDestory(pBuf);
} }
...@@ -478,9 +478,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -478,9 +478,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
// for projection query, need to try next vnode // for projection query, need to try next vnode
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t totalVnode = 0; int32_t totalVnode = 0;
if ((++pTableMetaInfo->dnodeIndex) < totalVnode) { if ((++pTableMetaInfo->vgroupIndex) < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotal); pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal);
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback; pSql->fp = tscJoinQueryCallback;
...@@ -542,7 +542,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -542,7 +542,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
assert(pQueryInfo->numOfTables == 1); assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted // for projection query, need to try next vnode if current vnode is exhausted
// if ((++pTableMetaInfo->dnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { // if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) {
// pSupporter->pState->numOfCompleted = 0; // pSupporter->pState->numOfCompleted = 0;
// pSupporter->pState->numOfTotal = 1; // pSupporter->pState->numOfTotal = 1;
// //
...@@ -609,7 +609,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -609,7 +609,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); // STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && // if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vgroupIndex < pTableMetaInfo->pMetricMeta->numOfVnodes &&
// (!tscHasReachLimitation(pQueryInfo, pRes))) { // (!tscHasReachLimitation(pQueryInfo, pRes))) {
// numOfFetch++; // numOfFetch++;
// } // }
...@@ -647,8 +647,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -647,8 +647,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pRes1->row >= pRes1->numOfRows) { if (pRes1->row >= pRes1->numOfRows) {
tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, dnodeIndex:%d", pSql, pSql1, tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vgroupIndex:%d", pSql, pSql1,
pSupporter->subqueryIndex, pTableMetaInfo->dnodeIndex); pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
tscResetForNextRetrieve(pRes1); tscResetForNextRetrieve(pRes1);
pSql1->fp = joinRetrieveCallback; pSql1->fp = joinRetrieveCallback;
...@@ -785,11 +785,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -785,11 +785,11 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
/** /**
* if the query is a continue query (dnodeIndex > 0 for projection query) for next vnode, do the retrieval of * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker * data instead of returning to its invoker
*/ */
if (pTableMetaInfo->dnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// assert(pTableMetaInfo->dnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); // assert(pTableMetaInfo->vgroupIndex < pTableMetaInfo->pMetricMeta->numOfVnodes);
pSupporter->pState->numOfCompleted = 0; // reset the record value pSupporter->pState->numOfCompleted = 0; // reset the record value
pSql->fp = joinRetrieveCallback; // continue retrieve data pSql->fp = joinRetrieveCallback; // continue retrieve data
...@@ -897,14 +897,14 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu ...@@ -897,14 +897,14 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
...@@ -1005,7 +1005,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1005,7 +1005,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfDnodes; pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups;
assert(pSql->numOfSubs > 0); assert(pSql->numOfSubs > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
...@@ -1241,7 +1241,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1241,7 +1241,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// data in from current vnode is stored in cache and disk // data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql,
pTableMetaInfo->vgroupList->dnodeVgroups[0].ipAddr.ip, pTableMetaInfo->vgroupList->dnodeVgroups[0].vgId[0], pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].ip, pTableMetaInfo->vgroupList->vgroups[0].vgId,
numOfRowsFromSubquery, idx); numOfRowsFromSubquery, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
...@@ -1401,9 +1401,9 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu ...@@ -1401,9 +1401,9 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1); assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1);
// launch subquery for each vnode, so the subquery index equals to the dnodeIndex. // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
pTableMetaInfo->dnodeIndex = trsupport->subqueryIndex; pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex;
pSql->pSubs[trsupport->subqueryIndex] = pNew; pSql->pSubs[trsupport->subqueryIndex] = pNew;
} }
...@@ -1421,7 +1421,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1421,7 +1421,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableDnodeVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->dnodeVgroups[0]; SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
SSubqueryState* pState = trsupport->pState; SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
...@@ -1459,7 +1459,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1459,7 +1459,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
if (pNew == NULL) { if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d", tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pVgroupInfo->vgId[0], trsupport->subqueryIndex); trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex);
pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
...@@ -1475,12 +1475,12 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1475,12 +1475,12 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex, pState->code); pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex, pState->code);
tscHandleSubqueryError(param, tres, pState->code); tscHandleSubqueryError(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode } else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
pVgroupInfo->ipAddr.ip, pVgroupInfo->vgId[0], trsupport->subqueryIndex); pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex);
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
} }
......
...@@ -171,17 +171,49 @@ void taos_init_imp() { ...@@ -171,17 +171,49 @@ void taos_init_imp() {
if(0 == tscEmbedded){ if(0 == tscEmbedded){
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
int64_t refreshTime = tsMetricMetaKeepTimer < tsMeterMetaKeepTimer ? tsMetricMetaKeepTimer : tsMeterMetaKeepTimer; int64_t refreshTime = tsMetricMetaKeepTimer < tsMeterMetaKeepTimer ? tsMetricMetaKeepTimer : tsMeterMetaKeepTimer;
refreshTime = refreshTime > 2 ? 2 : refreshTime; refreshTime = refreshTime > 2 ? 2 : refreshTime;
refreshTime = refreshTime < 1 ? 1 : refreshTime; refreshTime = refreshTime < 1 ? 1 : refreshTime;
if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime); if (tscCacheHandle == NULL) {
tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
}
tscTrace("client is initialized successfully"); tscTrace("client is initialized successfully");
} }
void taos_init() { pthread_once(&tscinit, taos_init_imp); } void taos_init() { pthread_once(&tscinit, taos_init_imp); }
void taos_cleanup() {
if (tscCacheHandle != NULL) {
taosCacheCleanup(tscCacheHandle);
}
if (tscQhandle != NULL) {
taosCleanUpScheduler(tscQhandle);
tscQhandle = NULL;
}
taosCloseLogger();
if (pVnodeConn != NULL) {
rpcClose(pVnodeConn);
pVnodeConn = NULL;
}
if (pTscMgmtConn != NULL) {
rpcClose(pTscMgmtConn);
pTscMgmtConn = NULL;
}
if (tsGlobalConfig != NULL) {
tfree(tsGlobalConfig);
}
taosTmrCleanUp(tscTmr);
}
static int taos_options_imp(TSDB_OPTION option, const char *pStr) { static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
SGlobalConfig *cfg = NULL; SGlobalConfig *cfg = NULL;
......
...@@ -1937,9 +1937,9 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1937,9 +1937,9 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
pTableMetaInfo->numOfTags = numOfTags; pTableMetaInfo->numOfTags = numOfTags;
if (vgroupList != NULL) { if (vgroupList != NULL) {
assert(vgroupList->numOfDnodes == 1); // todo fix me assert(vgroupList->numOfVgroups == 1); // todo fix me
size_t size = sizeof(SVgroupsInfo) + (sizeof(STableDnodeVgroupInfo) +
vgroupList->dnodeVgroups[0].numOfVgroups * sizeof(int32_t)) * vgroupList->numOfDnodes; size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups;
pTableMetaInfo->vgroupList = malloc(size); pTableMetaInfo->vgroupList = malloc(size);
memcpy(pTableMetaInfo->vgroupList, vgroupList, size); memcpy(pTableMetaInfo->vgroupList, vgroupList, size);
...@@ -2020,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2020,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) { if (pNew->sqlstr == NULL) {
tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex); tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex);
free(pNew); free(pNew);
return NULL; return NULL;
...@@ -2064,7 +2064,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2064,7 +2064,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} }
if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
tscError("%p new subquery failed, tableIndex:%d, dnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->dnodeIndex); tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex);
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
return NULL; return NULL;
} }
...@@ -2155,13 +2155,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2155,13 +2155,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscTrace( tscTrace(
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," "%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64, "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
pSql, pNew, tableIndex, pTableMetaInfo->dnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime,
pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
} else { } else {
tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->dnodeIndex); tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
} }
return pNew; return pNew;
...@@ -2258,7 +2258,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { ...@@ -2258,7 +2258,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && // return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->dnodeIndex < totalVnode - 1); // (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < totalVnode - 1);
} }
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
...@@ -2277,9 +2277,9 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -2277,9 +2277,9 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
int32_t totalVnode = 0; int32_t totalVnode = 0;
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; // int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
while (++pTableMetaInfo->dnodeIndex < totalVnode) { while (++pTableMetaInfo->vgroupIndex < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pTableMetaInfo->dnodeIndex - 1, pTableMetaInfo->dnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause); pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotalInCurrentClause);
/* /*
* update the limit and offset value for the query on the next vnode, * update the limit and offset value for the query on the next vnode,
...@@ -2298,7 +2298,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { ...@@ -2298,7 +2298,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql, tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql,
pTableMetaInfo->dnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
/* /*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries. * For project query with super table join, the numOfSub is equalled to the number of all subqueries.
......
...@@ -52,6 +52,7 @@ typedef struct taosField { ...@@ -52,6 +52,7 @@ typedef struct taosField {
#endif #endif
DLL_EXPORT void taos_init(); DLL_EXPORT void taos_init();
DLL_EXPORT void taos_cleanup();
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos); DLL_EXPORT void taos_close(TAOS *taos);
......
...@@ -674,7 +674,7 @@ typedef struct { ...@@ -674,7 +674,7 @@ typedef struct {
} SSuperTableMetaMsg; } SSuperTableMetaMsg;
typedef struct { typedef struct {
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; SVnodeDesc vpeerDesc[TSDB_REPLICA_MAX_NUM];
int16_t index; // used locally int16_t index; // used locally
int32_t numOfSids; int32_t numOfSids;
int32_t pSidExtInfoList[]; // offset value of STableIdInfo int32_t pSidExtInfoList[]; // offset value of STableIdInfo
......
...@@ -71,7 +71,7 @@ extern void* shellLoopQuery(void* arg); ...@@ -71,7 +71,7 @@ extern void* shellLoopQuery(void* arg);
extern void taos_error(TAOS* con); extern void taos_error(TAOS* con);
extern int regex_match(const char* s, const char* reg, int cflags); extern int regex_match(const char* s, const char* reg, int cflags);
void shellReadCommand(TAOS* con, char command[]); void shellReadCommand(TAOS* con, char command[]);
void shellRunCommand(TAOS* con, char* command); int32_t shellRunCommand(TAOS* con, char* command);
void shellRunCommandOnServer(TAOS* con, char command[]); void shellRunCommandOnServer(TAOS* con, char command[]);
void read_history(); void read_history();
void write_history(); void write_history();
......
...@@ -166,10 +166,10 @@ void shellReplaceCtrlChar(char *str) { ...@@ -166,10 +166,10 @@ void shellReplaceCtrlChar(char *str) {
*pstr = '\0'; *pstr = '\0';
} }
void shellRunCommand(TAOS *con, char *command) { int32_t shellRunCommand(TAOS *con, char *command) {
/* If command is empty just return */ /* If command is empty just return */
if (regex_match(command, "^[ \t;]*$", REG_EXTENDED)) { if (regex_match(command, "^[ \t;]*$", REG_EXTENDED)) {
return; return 0;
} }
/* Update the history vector. */ /* Update the history vector. */
...@@ -193,11 +193,11 @@ void shellRunCommand(TAOS *con, char *command) { ...@@ -193,11 +193,11 @@ void shellRunCommand(TAOS *con, char *command) {
if (regex_match(command, "^[ \t]*(quit|q|exit)[ \t;]*$", REG_EXTENDED | REG_ICASE)) { if (regex_match(command, "^[ \t]*(quit|q|exit)[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
taos_close(con); taos_close(con);
write_history(); write_history();
exitShell(); return -1;
} else if (regex_match(command, "^[\t ]*clear[ \t;]*$", REG_EXTENDED | REG_ICASE)) { } else if (regex_match(command, "^[\t ]*clear[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
// If clear the screen. // If clear the screen.
system("clear"); system("clear");
return; return 0;
} else if (regex_match(command, "^[ \t]*source[\t ]+[^ ]+[ \t;]*$", REG_EXTENDED | REG_ICASE)) { } else if (regex_match(command, "^[ \t]*source[\t ]+[^ ]+[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
/* If source file. */ /* If source file. */
char *c_ptr = strtok(command, " ;"); char *c_ptr = strtok(command, " ;");
...@@ -209,6 +209,8 @@ void shellRunCommand(TAOS *con, char *command) { ...@@ -209,6 +209,8 @@ void shellRunCommand(TAOS *con, char *command) {
} else { } else {
shellRunCommandOnServer(con, command); shellRunCommandOnServer(con, command);
} }
return 0;
} }
void shellRunCommandOnServer(TAOS *con, char command[]) { void shellRunCommandOnServer(TAOS *con, char command[]) {
......
...@@ -295,6 +295,7 @@ void *shellLoopQuery(void *arg) { ...@@ -295,6 +295,7 @@ void *shellLoopQuery(void *arg) {
tscError("failed to malloc command"); tscError("failed to malloc command");
return NULL; return NULL;
} }
while (1) { while (1) {
// Read command from shell. // Read command from shell.
...@@ -304,9 +305,14 @@ void *shellLoopQuery(void *arg) { ...@@ -304,9 +305,14 @@ void *shellLoopQuery(void *arg) {
reset_terminal_mode(); reset_terminal_mode();
// Run the command // Run the command
shellRunCommand(con, command); if (shellRunCommand(con, command) != 0) {
break;
}
} }
tfree(command);
exitShell();
pthread_cleanup_pop(1); pthread_cleanup_pop(1);
return NULL; return NULL;
...@@ -487,6 +493,7 @@ void showOnScreen(Command *cmd) { ...@@ -487,6 +493,7 @@ void showOnScreen(Command *cmd) {
void cleanup_handler(void *arg) { tcsetattr(0, TCSANOW, &oldtio); } void cleanup_handler(void *arg) { tcsetattr(0, TCSANOW, &oldtio); }
void exitShell() { void exitShell() {
tcsetattr(0, TCSANOW, &oldtio); /*int32_t ret =*/ tcsetattr(STDIN_FILENO, TCSANOW, &oldtio);
taos_cleanup();
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
...@@ -95,7 +95,7 @@ int main(int argc, char* argv[]) { ...@@ -95,7 +95,7 @@ int main(int argc, char* argv[]) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
/* Interupt handler. */ /* Interrupt handler. */
struct sigaction act; struct sigaction act;
memset(&act, 0, sizeof(struct sigaction)); memset(&act, 0, sizeof(struct sigaction));
......
...@@ -732,7 +732,6 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -732,7 +732,6 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData); pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData);
// pTable->mem->numOfPoints++;
return 0; return 0;
} }
......
...@@ -101,8 +101,6 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) { ...@@ -101,8 +101,6 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) {
if (pNode->next) { if (pNode->next) {
(pNode->next)->prev = pNode; (pNode->next)->prev = pNode;
} }
pTrace("key:%s %p update hash table", pNode->key, pNode);
} }
/** /**
...@@ -153,18 +151,18 @@ static void taosHashTableResize(SHashObj *pHashObj) { ...@@ -153,18 +151,18 @@ static void taosHashTableResize(SHashObj *pHashObj) {
SHashNode *pNode = NULL; SHashNode *pNode = NULL;
SHashNode *pNext = NULL; SHashNode *pNext = NULL;
int32_t newSize = pHashObj->capacity << 1U; int32_t newSize = pHashObj->capacity << 1u;
if (newSize > HASH_MAX_CAPACITY) { if (newSize > HASH_MAX_CAPACITY) {
pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", // pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
pHashObj->capacity, HASH_MAX_CAPACITY); // pHashObj->capacity, HASH_MAX_CAPACITY);
return; return;
} }
int64_t st = taosGetTimestampUs(); // int64_t st = taosGetTimestampUs();
SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize); SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize);
if (pNewEntry == NULL) { if (pNewEntry == NULL) {
pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); // pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return; return;
} }
...@@ -230,10 +228,9 @@ static void taosHashTableResize(SHashObj *pHashObj) { ...@@ -230,10 +228,9 @@ static void taosHashTableResize(SHashObj *pHashObj) {
} }
} }
int64_t et = taosGetTimestampUs(); // int64_t et = taosGetTimestampUs();
// pTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity,
pTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity, // ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
} }
/** /**
......
...@@ -562,7 +562,7 @@ void taosTmrCleanUp(void* handle) { ...@@ -562,7 +562,7 @@ void taosTmrCleanUp(void* handle) {
pthread_mutex_unlock(&tmrCtrlMutex); pthread_mutex_unlock(&tmrCtrlMutex);
if (numOfTmrCtrl <=0) { if (numOfTmrCtrl <=0) {
pthread_cancel(athread); // pthread_cancel(athread);
for (int i = 0; i < tListLen(wheels); i++) { for (int i = 0; i < tListLen(wheels); i++) {
time_wheel_t* wheel = wheels + i; time_wheel_t* wheel = wheels + i;
......
...@@ -75,6 +75,7 @@ int main(int argc, char *argv[]) { ...@@ -75,6 +75,7 @@ int main(int argc, char *argv[]) {
doQuery(taos, "create database if not exists test"); doQuery(taos, "create database if not exists test");
doQuery(taos, "use test"); doQuery(taos, "use test");
doQuery(taos, "insert into tm99 values('2020-01-01 1:1:1', 99);");
// doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);"); // doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册