提交 5d4f826b 编写于 作者: H hjxilinx

[TD-32] fix bugs in get table meta (parsing query sql)

上级 d1828544
......@@ -190,7 +190,7 @@ void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
STableMetaInfo* tscGetMeterMetaInfo(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
......
......@@ -20,18 +20,19 @@
extern "C" {
#endif
#include <tarray.h>
#include "os.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "taosdef.h"
#include "trpc.h"
#include "tsqlfunction.h"
#include "tutil.h"
#include "trpc.h"
#include "qsqltype.h"
#include "qsqlparser.h"
#include "qtsbuf.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
......@@ -68,8 +69,8 @@ typedef struct STableMeta {
int16_t sversion;
int8_t numOfVpeers;
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int32_t sid;
int32_t vgid;
int32_t sid;
uint64_t uid;
// if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
......
......@@ -465,7 +465,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pTableMetaInfo->pTableMeta == NULL);
tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -514,7 +514,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pCmd->isParseFinish) {
tscTrace("%p resend data to vnode in metermeta callback since sql has been parsed completed", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_SUCCESS);
......@@ -529,7 +529,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
} else { // stream computing
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
pRes->code = code;
......
......@@ -164,7 +164,7 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS
pSupporter->interval = pQueryInfo->intervalTime;
pSupporter->limit = pQueryInfo->limit;
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, index);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
pSupporter->uid = pTableMetaInfo->pTableMeta->uid;
assert (pSupporter->uid != 0);
......@@ -712,7 +712,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
// STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
// int32_t idx = pSql->cmd.vnodeIdx;
......
......@@ -77,7 +77,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
* length((uint64_t) 123456789011) > 12, greater than sizsof(uint64_t)
*/
static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) {
STableMeta *pMeta = tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->pTableMeta;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
......
......@@ -656,7 +656,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd,
int32_t *totalNum) {
SSqlCmd * pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
STableInfo tinfo = tscGetTableInfo(pTableMeta);
......@@ -1155,7 +1155,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
strcpy(pDataBlock->filename, fname);
} else if (sToken.type == TK_LP) {
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
STableMeta *pTableMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
SSchema * pSchema = tscGetTableSchema(pTableMeta);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
......@@ -1252,7 +1252,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
// set the next sent data vnode index in data block arraylist
pTableMetaInfo->vnodeIndex = 1;
......@@ -1355,7 +1355,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
SSqlCmd *pCmd = &pSql->cmd;
assert(pCmd->numOfClause == 1);
STableMeta *pTableMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
......@@ -1388,7 +1388,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int32_t code = 0;
int nrows = 0;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
STableInfo tinfo = tscGetTableInfo(pTableMeta);
......@@ -1482,7 +1482,7 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
}
STableDataBlocks *pDataBlock = NULL;
STableMetaInfo * pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
int32_t code = TSDB_CODE_SUCCESS;
......
......@@ -407,7 +407,7 @@ static int insertStmtReset(STscStmt* pStmt) {
}
pCmd->batchSize = 0;
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
pTableMetaInfo->vnodeIndex = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -421,7 +421,7 @@ static int insertStmtExecute(STscStmt* stmt) {
++pCmd->batchSize;
}
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
if (pCmd->pDataBlocks->nSize > 0) {
......
......@@ -896,7 +896,7 @@ bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
assert(pCmd->numOfClause == 1);
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfTags = tscGetNumOfTags(pTableMeta);
......@@ -969,7 +969,7 @@ bool validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
const char* msg6 = "invalid column length";
assert(pCmd->numOfClause == 1);
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfTags = tscGetNumOfTags(pTableMeta);
......@@ -2129,7 +2129,7 @@ int32_t changeFunctionID(int32_t optr, int16_t* functionId) {
int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
pCmd->command = TSDB_SQL_SHOW;
......
......@@ -155,9 +155,25 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema);
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize);
pTableMeta->tableType = pTableMetaMsg->tableType;
pTableMeta->tableInfo = (STableInfo){.numOfTags = pTableMetaMsg->numOfTags, .numOfColumns = pTableMetaMsg->numOfColumns,
.precision = pTableMetaMsg->precision};
pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->vgid = pTableMetaMsg->vgid;
pTableMeta->numOfVpeers = pTableMetaMsg->numOfVpeers;
memcpy(pTableMeta->vpeerDesc, pTableMetaMsg->vpeerDesc, sizeof(SVnodeDesc) * pTableMeta->numOfVpeers);
// pTableMeta->tableId = pTableMetaMsg->tableId;
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
for(int32_t i = 0; i < numOfTotalCols; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
if (size != NULL) {
*size = sizeof(STableMeta) + schemaSize;
}
......
......@@ -319,7 +319,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pRes->pLocalReducer = pReducer;
pRes->numOfGroups = 0;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t prec = tinfo.precision;
......@@ -920,7 +920,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
functions[i] = tscSqlExprGet(pQueryInfo, i)->functionId;
}
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t precision = tinfo.precision;
......@@ -1273,7 +1273,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
pQueryInfo->limit.offset = pLocalReducer->offset;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t precision = tinfo.precision;
......
......@@ -236,7 +236,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
if (rpcMsg->pCont == NULL) {
rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
} else {
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_SESSION ||
......@@ -1125,7 +1125,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj* pParentSql = trsupport->pParentSqlObj;
SSqlObj* pSql = (SSqlObj *)tres;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1);
int32_t idx = pTableMetaInfo->vnodeIndex;
......@@ -1231,7 +1231,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
//SShellSubmitMsg *pShellMsg;
//char * pMsg;
//STableMetaInfo * pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
//STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
//STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -1272,7 +1272,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
//TODO
// SSqlCmd * pCmd = &pSql->cmd;
// STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
//
// char * pStart = buf + tsRpcHeadSize;
// SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
......@@ -1320,7 +1320,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
}
static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vnodeId, char *pMsg) {
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
......@@ -1677,7 +1677,7 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMCreateDbMsg *pCreateDbMsg = (SCMCreateDbMsg*)pCmd->payload;
assert(pCmd->numOfClause == 1);
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strncpy(pCreateDbMsg->db, pTableMetaInfo->name, tListLen(pCreateDbMsg->db));
return TSDB_CODE_SUCCESS;
......@@ -1799,7 +1799,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMDropDbMsg *pDropDbMsg = (SCMDropDbMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strncpy(pDropDbMsg->db, pTableMetaInfo->name, tListLen(pDropDbMsg->db));
pDropDbMsg->ignoreNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
......@@ -1817,7 +1817,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SCMDropTableMsg *pDropTableMsg = (SCMDropTableMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropTableMsg->tableId, pTableMetaInfo->name);
pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
......@@ -1834,7 +1834,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pDrop->ip, pTableMetaInfo->name);
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
......@@ -1852,7 +1852,7 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SCMDropUserMsg *pDropMsg = (SCMDropUserMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropMsg->user, pTableMetaInfo->name);
return TSDB_CODE_SUCCESS;
......@@ -1868,7 +1868,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SCMUseDbMsg *pUseDbMsg = (SCMUseDbMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pUseDbMsg->db, pTableMetaInfo->name);
pCmd->msgType = TSDB_MSG_TYPE_CM_USE_DB;
......@@ -1888,7 +1888,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMShowMsg *pShowMsg = (SCMShowMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
size_t nameLen = strlen(pTableMetaInfo->name);
if (nameLen > 0) {
strcpy(pShowMsg->db, pTableMetaInfo->name); // prefix is set here
......@@ -2099,7 +2099,7 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SCMAlterDbMsg *pAlterDbMsg = (SCMAlterDbMsg*)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pAlterDbMsg->db, pTableMetaInfo->name);
return TSDB_CODE_SUCCESS;
......@@ -2174,7 +2174,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
int tscProcessDescribeTableRsp(SSqlObj *pSql) {
SSqlCmd * pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -2405,7 +2405,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(int16_t);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, i);
pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
offset = pMsg - (char *)pMetaMsg;
......@@ -2556,49 +2556,46 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return msgLen;
}
int tscProcessMeterMetaRsp(SSqlObj *pSql) {
STableMetaMsg *pMeta;
SSchema * pSchema;
int tscProcessTableMetaRsp(SSqlObj *pSql) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
pMeta = (STableMetaMsg *)pSql->res.pRsp;
pMetaMsg->sid = htonl(pMetaMsg->sid);
pMetaMsg->sversion = htons(pMetaMsg->sversion);
pMetaMsg->vgid = htonl(pMetaMsg->vgid);
pMetaMsg->uid = htobe64(pMetaMsg->uid);
pMetaMsg->contLen = htons(pMetaMsg->contLen);
pMeta->sid = htonl(pMeta->sid);
pMeta->sversion = htons(pMeta->sversion);
pMeta->vgid = htonl(pMeta->vgid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->contLen = htons(pMeta->contLen);
if (pMeta->sid < 0 || pMeta->vgid < 0) {
tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
if (pMetaMsg->sid < 0 || pMetaMsg->vgid < 0) {
tscError("invalid meter vgid:%d, sid%d", pMetaMsg->vgid, pMetaMsg->sid);
return TSDB_CODE_INVALID_VALUE;
}
pMeta->numOfColumns = htons(pMeta->numOfColumns);
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
tscError("invalid numOfTags:%d", pMeta->numOfTags);
if (pMetaMsg->numOfTags > TSDB_MAX_TAGS || pMetaMsg->numOfTags < 0) {
tscError("invalid numOfTags:%d", pMetaMsg->numOfTags);
return TSDB_CODE_INVALID_VALUE;
}
if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns <= 0) {
tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
tscError("invalid numOfColumns:%d", pMetaMsg->numOfColumns);
return TSDB_CODE_INVALID_VALUE;
}
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
pMeta->vpeerDesc[i].vnode = htonl(pMeta->vpeerDesc[i].vnode);
pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode);
}
int32_t rowSize = 0;
pSchema = (SSchema *)(pSql->res.pRsp + sizeof(STableMeta));
SSchema* pSchema = pMetaMsg->schema;
int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
for (int i = 0; i < numOfTotalCols; ++i) {
pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = htons(pSchema->colId);
// ignore the tags length
if (i < pMeta->numOfColumns) {
if (i < pMetaMsg->numOfColumns) {
rowSize += pSchema->bytes;
}
pSchema++;
......@@ -2607,29 +2604,32 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
// rsp += numOfTotalCols * sizeof(SSchema);
//
// int32_t tagLen = 0;
// SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
// SSchema *pTagsSchema = tscGetTableTagSchema(pMetaMsg);
//
// if (pMeta->tableType == TSDB_CHILD_TABLE) {
// for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
// if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
// for (int32_t i = 0; i < pMetaMsg->numOfTags; ++i) {
// tagLen += pTagsSchema[i].bytes;
// }
// }
//
// rsp += tagLen;
// int32_t size = (int32_t)(rsp - (char *)pMeta);
// int32_t size = (int32_t)(rsp - (char *)pMetaMsg);
// pMeta->index = rand() % TSDB_VNODES_SUPPORT;
// pMeta->index = 0;
size_t size = 0;
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size);
// todo add one more function: taosAddDataIfNotExists();
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pTableMetaInfo->pTableMeta == NULL);
pTableMetaInfo->pTableMeta = (STableMeta *)taosCachePut(tscCacheHandle, pTableMetaInfo->name, (char *)pMeta,
pMeta->contLen, tsMeterMetaKeepTimer);
pTableMetaInfo->pTableMeta = (STableMeta *)taosCachePut(tscCacheHandle, pTableMetaInfo->name, pTableMeta,
size, tsMeterMetaKeepTimer);
// todo handle out of memory case
if (pTableMetaInfo->pTableMeta == NULL) return 0;
if (pTableMetaInfo->pTableMeta == NULL) {
return 0;
}
free(pTableMeta);
return TSDB_CODE_OTHERS;
}
......@@ -2956,7 +2956,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
int tscProcessUseDbRsp(SSqlObj *pSql) {
STscObj * pObj = pSql->pTscObj;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
strcpy(pObj->db, pTableMetaInfo->name);
return 0;
......@@ -2968,7 +2968,7 @@ int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
}
int tscProcessDropTableRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
if (pTableMeta == NULL) {
......@@ -2995,7 +2995,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
}
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
STableMeta *pTableMeta = taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
if (pTableMeta == NULL) { /* not in cache, abort */
......@@ -3378,7 +3378,7 @@ void tscInitMsgs() {
tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessMeterMetaRsp;
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_METRIC] = tscProcessMetricMetaRsp;
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
......
......@@ -146,7 +146,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError("%p stream:%p, query data failed, code:%d, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(&pStream->pSql->cmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
tscClearMeterMetaInfo(pTableMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
......@@ -172,7 +172,7 @@ static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) {
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) {
SSqlStream * pStream = (SSqlStream *)param;
SSqlObj * pSql = (SSqlObj *)res;
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
......
......@@ -175,7 +175,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
return 0;
}
STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
int numOfTables = 0;
if (!UTIL_METER_IS_NOMRAL_METER(pTableMetaInfo)) {
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
......@@ -385,7 +385,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->vnodeIndex = 0;
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vnodeIndex = 0;
}
tscDoQuery(pSql);
......
......@@ -584,7 +584,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
pCmd->numOfTablesInSubmit = pDataBlock->numOfTables;
assert(pCmd->numOfClause == 1);
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
......@@ -1747,7 +1747,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
* @param tableIndex denote the table index for join query, where more than one table exists
* @return
*/
STableMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t clauseIndex, int32_t tableIndex) {
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t clauseIndex, int32_t tableIndex) {
if (pCmd == NULL || pCmd->numOfClause == 0) {
return NULL;
}
......@@ -1957,7 +1957,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, tableIndex);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex);
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
......@@ -2079,7 +2079,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pTableMeta, pMetricMeta, pTableMetaInfo->numOfTags,
pTableMetaInfo->tagColumnIndex);
} else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
......
......@@ -84,7 +84,7 @@ typedef struct {
} SVnodeGid;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
int8_t dirty;
uint64_t uid;
......@@ -128,7 +128,7 @@ typedef struct {
} SChildTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
int8_t dirty;
uint64_t uid;
......
......@@ -229,7 +229,6 @@ typedef struct SSchema {
} SSchema;
typedef struct {
int32_t vgId;
int32_t vnode; //the index of vnode
uint32_t ip;
} SVnodeDesc;
......@@ -256,14 +255,14 @@ typedef struct {
uint64_t uid;
uint64_t superTableUid;
uint64_t createdTime;
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
char superTableId[TSDB_TABLE_ID_LEN];
char data[];
} SMDCreateTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
int8_t igExists;
int16_t numOfTags;
int16_t numOfColumns;
......@@ -274,13 +273,13 @@ typedef struct {
} SCMCreateTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
int8_t igNotExists;
} SCMDropTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
int16_t type; /* operation type */
char tagVal[TSDB_MAX_BYTES_PER_ROW];
int8_t numOfCols; /* number of schema */
......
......@@ -451,7 +451,7 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg
pMeta->numOfColumns = htons(pTable->superTable->numOfColumns);
pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable);
strcpy(pMeta->tableId, pTable->tableId);
strncpy(pMeta->tableId, pTable->tableId, tListLen(pTable->tableId));
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
......@@ -464,7 +464,6 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
}
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
}
pMeta->numOfVpeers = pVgroup->numOfVnodes;
......
......@@ -531,6 +531,8 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaM
pMeta->numOfColumns = htons(pTable->numOfColumns);
pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable);
strncpy(pMeta->tableId, pTable->tableId, tListLen(pTable->tableId));
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
......@@ -542,8 +544,8 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaM
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
}
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
}
pMeta->numOfVpeers = pVgroup->numOfVnodes;
......
......@@ -610,9 +610,10 @@ void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) {
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pMeta);
} else {
pMeta->contLen = htons(pMeta->contLen);
rpcRsp.pCont = pMeta;
rpcRsp.contLen = pMeta->contLen;
pMeta->contLen = htons(pMeta->contLen);
}
rpcSendResponse(&rpcRsp);
......
......@@ -514,7 +514,6 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) {
SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
vpeerDesc[j].vgId = htonl(pVgroup->vgId);
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].ip);
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
}
......
......@@ -98,15 +98,15 @@ SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTimeInSeconds);
* @param keepTime survival time in second
* @return cached element
*/
void *taosCachePut(void *handle, char *key, char *pData, int dataSize, int keepTimeInSeconds);
void *taosCachePut(SCacheObj *pCacheObj, char *key, void *pData, size_t dataSize, int keepTimeInSeconds);
/**
* get data from cache
* @param handle cache object
* @param pCacheObj cache object
* @param key key
* @return cached data or NULL
*/
void *taosCacheAcquireByName(void *handle, char *key);
void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key);
/**
* Add one reference count for the exist data, and assign this data for a new owner.
......@@ -118,7 +118,7 @@ void *taosCacheAcquireByName(void *handle, char *key);
* @param data
* @return
*/
void *taosCacheAcquireByData(void *handle, void *data);
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data);
/**
* transfer the ownership of data in cache to another object without increasing reference count.
......@@ -126,7 +126,7 @@ void *taosCacheAcquireByData(void *handle, void *data);
* @param data
* @return
*/
void *taosCacheTransfer(void *handle, void **data);
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data);
/**
* remove data in cache, the data will not be removed immediately.
......@@ -136,7 +136,7 @@ void *taosCacheTransfer(void *handle, void **data);
* @param _remove force model, reduce the ref count and move the data into
* pTrash
*/
void taosCacheRelease(void *handle, void **data, bool _remove);
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove);
/**
* move all data node into trash, clear node in trash can if it is not referenced by any clients
......
......@@ -22,43 +22,43 @@
#include "ttimer.h"
#include "tutil.h"
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pObj) {
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
#if defined(LINUX)
pthread_rwlock_wrlock(&pObj->lock);
pthread_rwlock_wrlock(&pCacheObj->lock);
#else
pthread_mutex_lock(&pObj->lock);
pthread_mutex_lock(&pCacheObj->lock);
#endif
}
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pObj) {
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pCacheObj) {
#if defined(LINUX)
pthread_rwlock_rdlock(&pObj->lock);
pthread_rwlock_rdlock(&pCacheObj->lock);
#else
pthread_mutex_lock(&pObj->lock);
pthread_mutex_lock(&pCacheObj->lock);
#endif
}
static FORCE_INLINE void __cache_unlock(SCacheObj *pObj) {
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
#if defined(LINUX)
pthread_rwlock_unlock(&pObj->lock);
pthread_rwlock_unlock(&pCacheObj->lock);
#else
pthread_mutex_unlock(&pObj->lock);
pthread_mutex_unlock(&pCacheObj->lock);
#endif
}
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pObj) {
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
#if defined(LINUX)
return pthread_rwlock_init(&pObj->lock, NULL);
return pthread_rwlock_init(&pCacheObj->lock, NULL);
#else
return pthread_mutex_init(&pObj->lock, NULL);
return pthread_mutex_init(&pCacheObj->lock, NULL);
#endif
}
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pObj) {
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
#if defined(LINUX)
pthread_rwlock_destroy(&pObj->lock);
pthread_rwlock_destroy(&pCacheObj->lock);
#else
pthread_mutex_destroy(&pObj->lock);
pthread_mutex_destroy(&pCacheObj->lock);
#endif
}
......@@ -105,10 +105,10 @@ static SCacheDataNode *taosCreateHashNode(const char *key, size_t keyLen, const
/**
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
* It will be removed until the pNode->refCount == 0
* @param pObj Cache object
* @param pCacheObj Cache object
* @param pNode Cache slot object
*/
static void taosAddToTrash(SCacheObj *pObj, SCacheDataNode *pNode) {
static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
if (pNode->inTrash) { /* node is already in trash */
return;
}
......@@ -116,31 +116,31 @@ static void taosAddToTrash(SCacheObj *pObj, SCacheDataNode *pNode) {
STrashElem *pElem = calloc(1, sizeof(STrashElem));
pElem->pData = pNode;
pElem->next = pObj->pTrash;
if (pObj->pTrash) {
pObj->pTrash->prev = pElem;
pElem->next = pCacheObj->pTrash;
if (pCacheObj->pTrash) {
pCacheObj->pTrash->prev = pElem;
}
pElem->prev = NULL;
pObj->pTrash = pElem;
pCacheObj->pTrash = pElem;
pNode->inTrash = true;
pObj->numOfElemsInTrash++;
pCacheObj->numOfElemsInTrash++;
pTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pObj->numOfElemsInTrash);
pTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash);
}
static void taosRemoveFromTrash(SCacheObj *pObj, STrashElem *pElem) {
static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) {
if (pElem->pData->signature != (uint64_t)pElem->pData) {
pError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return;
}
pObj->numOfElemsInTrash--;
pCacheObj->numOfElemsInTrash--;
if (pElem->prev) {
pElem->prev->next = pElem->next;
} else { /* pnode is the header, update header */
pObj->pTrash = pElem->next;
pCacheObj->pTrash = pElem->next;
}
if (pElem->next) {
......@@ -154,24 +154,24 @@ static void taosRemoveFromTrash(SCacheObj *pObj, STrashElem *pElem) {
/**
* remove nodes in trash with refCount == 0 in cache
* @param pNode
* @param pObj
* @param pCacheObj
* @param force force model, if true, remove data in trash without check refcount.
* may cause corruption. So, forece model only applys before cache is closed
*/
static void taosTrashEmpty(SCacheObj *pObj, bool force) {
__cache_wr_lock(pObj);
static void taosTrashEmpty(SCacheObj *pCacheObj, bool force) {
__cache_wr_lock(pCacheObj);
if (pObj->numOfElemsInTrash == 0) {
if (pObj->pTrash != NULL) {
pError("key:inconsistency data in cache, numOfElem in trash:%d", pObj->numOfElemsInTrash);
if (pCacheObj->numOfElemsInTrash == 0) {
if (pCacheObj->pTrash != NULL) {
pError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash);
}
pObj->pTrash = NULL;
pCacheObj->pTrash = NULL;
__cache_unlock(pObj);
__cache_unlock(pCacheObj);
return;
}
STrashElem *pElem = pObj->pTrash;
STrashElem *pElem = pCacheObj->pTrash;
while (pElem) {
T_REF_VAL_CHECK(pElem->pData);
......@@ -181,51 +181,51 @@ static void taosTrashEmpty(SCacheObj *pObj, bool force) {
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
pTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
pObj->numOfElemsInTrash - 1);
pCacheObj->numOfElemsInTrash - 1);
STrashElem *p = pElem;
pElem = pElem->next;
taosRemoveFromTrash(pObj, p);
taosRemoveFromTrash(pCacheObj, p);
} else {
pElem = pElem->next;
}
}
assert(pObj->numOfElemsInTrash >= 0);
__cache_unlock(pObj);
assert(pCacheObj->numOfElemsInTrash >= 0);
__cache_unlock(pCacheObj);
}
/**
* release node
* @param pObj cache object
* @param pCacheObj cache object
* @param pNode data node
*/
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SCacheDataNode *pNode) {
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
if (pNode->signature != (uint64_t)pNode) {
pError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
return;
}
int32_t size = pNode->size;
taosHashRemove(pObj->pHashTable, pNode->key, pNode->keySize);
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->totalSize, size);
pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pCacheObj->totalSize, size);
free(pNode);
}
/**
* move the old node into trash
* @param pObj
* @param pCacheObj
* @param pNode
*/
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pObj, SCacheDataNode *pNode) {
taosHashRemove(pObj->pHashTable, pNode->key, pNode->keySize);
taosAddToTrash(pObj, pNode);
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
taosAddToTrash(pCacheObj, pNode);
}
/**
* update data in cache
* @param pObj
* @param pCacheObj
* @param pNode
* @param key
* @param keyLen
......@@ -233,7 +233,7 @@ static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pObj, SCacheDataNode *p
* @param dataSize
* @return
*/
static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SCacheDataNode *pNode, char *key, int32_t keyLen,
static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pCacheObj, SCacheDataNode *pNode, char *key, int32_t keyLen,
void *pData, uint32_t dataSize, uint64_t duration) {
SCacheDataNode *pNewNode = NULL;
......@@ -260,9 +260,9 @@ static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SCacheDataNode *pNod
T_REF_INC(pNewNode);
// the address of this node may be changed, so the prev and next element should update the corresponding pointer
taosHashPut(pObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
} else {
taosCacheMoveToTrash(pObj, pNode);
taosCacheMoveToTrash(pCacheObj, pNode);
pNewNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration);
if (pNewNode == NULL) {
......@@ -272,7 +272,7 @@ static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SCacheDataNode *pNod
T_REF_INC(pNewNode);
// addedTime new element to hashtable
taosHashPut(pObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
}
return pNewNode;
......@@ -283,12 +283,12 @@ static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SCacheDataNode *pNod
* @param key
* @param pData
* @param size
* @param pObj
* @param pCacheObj
* @param keyLen
* @param pNode
* @return
*/
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, size_t keyLen, const void *pData,
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, char *key, size_t keyLen, const void *pData,
size_t dataSize, uint64_t duration) {
SCacheDataNode *pNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration);
if (pNode == NULL) {
......@@ -296,70 +296,70 @@ static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *ke
}
T_REF_INC(pNode);
taosHashPut(pObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
return pNode;
}
static void doCleanupDataCache(SCacheObj *pObj) {
__cache_wr_lock(pObj);
static void doCleanupDataCache(SCacheObj *pCacheObj) {
__cache_wr_lock(pCacheObj);
if (taosHashGetSize(pObj->pHashTable) > 0) {
taosHashCleanup(pObj->pHashTable);
if (taosHashGetSize(pCacheObj->pHashTable) > 0) {
taosHashCleanup(pCacheObj->pHashTable);
}
__cache_unlock(pObj);
__cache_unlock(pCacheObj);
taosTrashEmpty(pObj, true);
__cache_lock_destroy(pObj);
taosTrashEmpty(pCacheObj, true);
__cache_lock_destroy(pCacheObj);
memset(pObj, 0, sizeof(SCacheObj));
free(pObj);
memset(pCacheObj, 0, sizeof(SCacheObj));
free(pCacheObj);
}
/**
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pObj->refreshTime
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
* @param handle Cache object handle
*/
static void taosCacheRefresh(void *handle, void *tmrId) {
SCacheObj *pObj = (SCacheObj *)handle;
SCacheObj *pCacheObj = (SCacheObj *)handle;
if (pObj == NULL || taosHashGetSize(pObj->pHashTable) == 0) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
pTrace("object is destroyed. no refresh retry");
return;
}
if (pObj->deleting == 1) {
doCleanupDataCache(pObj);
if (pCacheObj->deleting == 1) {
doCleanupDataCache(pCacheObj);
return;
}
uint64_t expiredTime = taosGetTimestampMs();
pObj->statistics.refreshCount++;
pCacheObj->statistics.refreshCount++;
SHashMutableIterator *pIter = taosHashCreateIter(pObj->pHashTable);
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
__cache_wr_lock(pObj);
__cache_wr_lock(pCacheObj);
while (taosHashIterNext(pIter)) {
if (pObj->deleting == 1) {
if (pCacheObj->deleting == 1) {
taosHashDestroyIter(pIter);
break;
}
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pObj, pNode);
taosCacheReleaseNode(pCacheObj, pNode);
}
}
__cache_unlock(pObj);
__cache_unlock(pCacheObj);
taosHashDestroyIter(pIter);
if (pObj->deleting == 1) { // clean up resources and abort
doCleanupDataCache(pObj);
if (pCacheObj->deleting == 1) { // clean up resources and abort
doCleanupDataCache(pCacheObj);
} else {
taosTrashEmpty(pObj, false);
taosTmrReset(taosCacheRefresh, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
taosTrashEmpty(pCacheObj, false);
taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
}
}
......@@ -368,104 +368,100 @@ SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) {
return NULL;
}
SCacheObj *pObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
if (pObj == NULL) {
SCacheObj *pCacheObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
if (pCacheObj == NULL) {
pError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
pObj->pHashTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
if (pObj->pHashTable == NULL) {
free(pObj);
pCacheObj->pHashTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
if (pCacheObj->pHashTable == NULL) {
free(pCacheObj);
pError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
// set free cache node callback function for hash table
taosHashSetFreecb(pObj->pHashTable, taosFreeNode);
taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
pObj->refreshTime = refreshTime * 1000;
pObj->tmrCtrl = tmrCtrl;
pCacheObj->refreshTime = refreshTime * 1000;
pCacheObj->tmrCtrl = tmrCtrl;
taosTmrReset(taosCacheRefresh, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer);
if (__cache_lock_init(pObj) != 0) {
taosTmrStopA(&pObj->pTimer);
taosHashCleanup(pObj->pHashTable);
free(pObj);
if (__cache_lock_init(pCacheObj) != 0) {
taosTmrStopA(&pCacheObj->pTimer);
taosHashCleanup(pCacheObj->pHashTable);
free(pCacheObj);
pError("failed to init lock, reason:%s", strerror(errno));
return NULL;
}
return pObj;
return pCacheObj;
}
void *taosCachePut(void *handle, char *key, char *pData, int dataSize, int duration) {
void *taosCachePut(SCacheObj *pCacheObj, char *key, void *pData, size_t dataSize, int duration) {
SCacheDataNode *pNode;
SCacheObj * pObj;
pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->pHashTable == NULL) {
if (pCacheObj == NULL || pCacheObj->pHashTable == NULL) {
return NULL;
}
size_t keyLen = strlen(key);
__cache_wr_lock(pObj);
SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pObj->pHashTable, key, keyLen);
__cache_wr_lock(pCacheObj);
SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
if (pOld == NULL) { // do addedTime to cache
pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, duration * 1000L);
pNode = taosAddToCacheImpl(pCacheObj, key, keyLen, pData, dataSize, duration * 1000L);
if (NULL != pNode) {
pObj->totalSize += pNode->size;
pCacheObj->totalSize += pNode->size;
pTrace("key:%s %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", total:%d, size:%" PRId64 " bytes",
key, pNode, pNode->addedTime, pNode->expiredTime, pObj->totalSize, dataSize);
key, pNode, pNode->addedTime, pNode->expiredTime, pCacheObj->totalSize, dataSize);
}
} else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
pTrace("key:%s %p exist in cache, updated", key, pNode);
}
__cache_unlock(pObj);
__cache_unlock(pCacheObj);
return (pNode != NULL) ? pNode->data : NULL;
}
void *taosCacheAcquireByName(void *handle, char *key) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || taosHashGetSize(pObj->pHashTable) == 0) {
void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
return NULL;
}
uint32_t keyLen = (uint32_t)strlen(key);
__cache_rd_lock(pObj);
__cache_rd_lock(pCacheObj);
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pObj->pHashTable, key, keyLen);
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
if (ptNode != NULL) {
T_REF_INC(*ptNode);
}
__cache_unlock(pObj);
__cache_unlock(pCacheObj);
if (ptNode != NULL) {
atomic_add_fetch_32(&pObj->statistics.hitCount, 1);
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
pTrace("key:%s is retrieved from cache,refcnt:%d", key, T_REF_VAL_GET(*ptNode));
} else {
atomic_add_fetch_32(&pObj->statistics.missCount, 1);
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
pTrace("key:%s not in cache,retrieved failed", key);
}
atomic_add_fetch_32(&pObj->statistics.totalAccess, 1);
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL;
}
void *taosCacheAcquireByData(void *handle, void *data) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || data == NULL) return NULL;
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if (pCacheObj == NULL || data == NULL) return NULL;
size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
......@@ -483,9 +479,8 @@ void *taosCacheAcquireByData(void *handle, void *data) {
return data;
}
void *taosCacheTransfer(void *handle, void **data) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || data == NULL) return NULL;
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
if (pCacheObj == NULL || data == NULL) return NULL;
size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
......@@ -505,9 +500,8 @@ void *taosCacheTransfer(void *handle, void **data) {
return d;
}
void taosCacheRelease(void *handle, void **data, bool _remove) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || (*data) == NULL || (taosHashGetSize(pObj->pHashTable) + pObj->numOfElemsInTrash == 0)) {
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
if (pCacheObj == NULL || (*data) == NULL || (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0)) {
return;
}
......@@ -523,13 +517,13 @@ void taosCacheRelease(void *handle, void **data, bool _remove) {
*data = NULL;
if (_remove) {
__cache_wr_lock(pObj);
__cache_wr_lock(pCacheObj);
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
// So we need to lock it in the first place.
T_REF_DEC(pNode);
taosCacheMoveToTrash(pObj, pNode);
taosCacheMoveToTrash(pCacheObj, pNode);
__cache_unlock(pObj);
__cache_unlock(pCacheObj);
} else {
T_REF_DEC(pNode);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册