未验证 提交 519b6ca0 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #11305 from taosdata/feature/qnode

Feature/qnode
......@@ -512,6 +512,7 @@ typedef struct {
int32_t maxRows;
int32_t commitTime;
int32_t fsyncPeriod;
int32_t ttl;
int8_t walLevel;
int8_t precision; // time resolution
int8_t compression;
......@@ -521,6 +522,7 @@ typedef struct {
int8_t cacheLastRow;
int8_t ignoreExist;
int8_t streamMode;
int8_t singleSTable;
int32_t numOfRetensions;
SArray* pRetensions; // SRetention
} SCreateDbReq;
......@@ -586,6 +588,41 @@ int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp);
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
} SDbCfgReq;
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
typedef struct {
int32_t numOfVgroups;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRows;
int32_t maxRows;
int32_t commitTime;
int32_t fsyncPeriod;
int32_t ttl;
int8_t walLevel;
int8_t precision;
int8_t compression;
int8_t replications;
int8_t quorum;
int8_t update;
int8_t cacheLastRow;
int8_t streamMode;
int8_t singleSTable;
} SDbCfgRsp;
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp);
typedef struct {
int32_t rowNum;
} SQnodeListReq;
......
......@@ -155,6 +155,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
// Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG)
......
......@@ -77,6 +77,8 @@ typedef struct SDbVgVersion {
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SDbVgVersion;
typedef SDbCfgRsp SDbCfgInfo;
int32_t catalogInit(SCatalogCfg *cfg);
/**
......@@ -217,6 +219,8 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stable
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
/**
* Destroy catalog and relase all resources
......
......@@ -1515,6 +1515,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI32(&encoder, pReq->maxRows) < 0) return -1;
if (tEncodeI32(&encoder, pReq->commitTime) < 0) return -1;
if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->precision) < 0) return -1;
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
......@@ -1524,6 +1525,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1;
if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1;
if (tEncodeI8(&encoder, pReq->singleSTable) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
SRetention *pRetension = taosArrayGet(pReq->pRetensions, i);
......@@ -1556,6 +1558,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI32(&decoder, &pReq->maxRows) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->commitTime) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
......@@ -1565,6 +1568,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->singleSTable) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
if (pReq->pRetensions == NULL) {
......@@ -1942,6 +1946,94 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) {
taosArrayDestroy(pRsp->pArray);
}
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->cacheBlockSize) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->totalBlocks) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->daysToKeep1) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->daysToKeep2) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->minRows) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->maxRows) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->commitTime) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->fsyncPeriod) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->quorum) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->update) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->streamMode) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->cacheBlockSize) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->totalBlocks) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->daysToKeep1) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->daysToKeep2) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->minRows) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->maxRows) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->commitTime) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->fsyncPeriod) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->quorum) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->update) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->streamMode) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......
......@@ -146,6 +146,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
// Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
......
......@@ -260,6 +260,7 @@ typedef struct {
int32_t maxRows;
int32_t commitTime;
int32_t fsyncPeriod;
int32_t ttl;
int8_t walLevel;
int8_t precision;
int8_t compression;
......@@ -268,6 +269,7 @@ typedef struct {
int8_t update;
int8_t cacheLastRow;
int8_t streamMode;
int8_t singleSTable;
int32_t numOfRetensions;
SArray* pRetensions;
} SDbCfg;
......
......@@ -40,6 +40,7 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
int32_t mndInitDb(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_DB,
......@@ -56,6 +57,7 @@ int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
......@@ -268,6 +270,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->minRows > pCfg->maxRows) return -1;
if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) return -1;
if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) return -1;
if (pCfg->ttl < TSDB_MIN_DB_TTL_OPTION && pCfg->ttl != TSDB_DEFAULT_DB_TTL_OPTION) return -1;
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) return -1;
if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1;
if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1;
......@@ -278,6 +281,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) return -1;
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
if (pCfg->streamMode < TSDB_MIN_DB_STREAM_MODE || pCfg->streamMode > TSDB_MAX_DB_STREAM_MODE) return -1;
if (pCfg->singleSTable < TSDB_MIN_DB_SINGLE_STABLE_OPTION || pCfg->streamMode > TSDB_MAX_DB_SINGLE_STABLE_OPTION) return -1;
return TSDB_CODE_SUCCESS;
}
......@@ -293,6 +297,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->maxRows < 0) pCfg->maxRows = TSDB_DEFAULT_MAX_ROW_FBLOCK;
if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME;
if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
if (pCfg->ttl < 0) pCfg->ttl = TSDB_DEFAULT_DB_TTL_OPTION;
if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL;
if (pCfg->precision < 0) pCfg->precision = TSDB_DEFAULT_PRECISION;
if (pCfg->compression < 0) pCfg->compression = TSDB_DEFAULT_COMP_LEVEL;
......@@ -301,6 +306,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->update < 0) pCfg->update = TSDB_DEFAULT_DB_UPDATE_OPTION;
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
if (pCfg->streamMode < 0) pCfg->streamMode = TSDB_DEFAULT_DB_STREAM_MODE;
if (pCfg->singleSTable < 0) pCfg->singleSTable = TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION;
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
}
......@@ -437,6 +443,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
.maxRows = pCreate->maxRows,
.commitTime = pCreate->commitTime,
.fsyncPeriod = pCreate->fsyncPeriod,
.ttl = pCreate->ttl,
.walLevel = pCreate->walLevel,
.precision = pCreate->precision,
.compression = pCreate->compression,
......@@ -445,6 +452,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
.update = pCreate->update,
.cacheLastRow = pCreate->cacheLastRow,
.streamMode = pCreate->streamMode,
.singleSTable = pCreate->singleSTable,
};
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
......@@ -730,6 +738,71 @@ ALTER_DB_OVER:
return code;
}
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
int32_t code = -1;
SDbObj *pDb = NULL;
SDbCfgReq cfgReq = {0};
SDbCfgRsp cfgRsp = {0};
if (tDeserializeSDbCfgReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto GET_DB_CFG_OVER;
}
pDb = mndAcquireDb(pMnode, cfgReq.db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto GET_DB_CFG_OVER;
}
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
cfgRsp.cacheBlockSize = pDb->cfg.cacheBlockSize;
cfgRsp.totalBlocks = pDb->cfg.totalBlocks;
cfgRsp.daysPerFile = pDb->cfg.daysPerFile;
cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0;
cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1;
cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2;
cfgRsp.minRows = pDb->cfg.minRows;
cfgRsp.maxRows = pDb->cfg.maxRows;
cfgRsp.commitTime = pDb->cfg.commitTime;
cfgRsp.fsyncPeriod = pDb->cfg.fsyncPeriod;
cfgRsp.ttl = pDb->cfg.ttl;
cfgRsp.walLevel = pDb->cfg.walLevel;
cfgRsp.precision = pDb->cfg.precision;
cfgRsp.compression = pDb->cfg.compression;
cfgRsp.replications = pDb->cfg.replications;
cfgRsp.quorum = pDb->cfg.quorum;
cfgRsp.update = pDb->cfg.update;
cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow;
cfgRsp.streamMode = pDb->cfg.streamMode;
cfgRsp.singleSTable = pDb->cfg.singleSTable;
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto GET_DB_CFG_OVER;
}
tSerializeSDbCfgRsp(pRsp, contLen, &cfgRsp);
pReq->pRsp = pRsp;
pReq->rspLen = contLen;
GET_DB_CFG_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to get cfg since %s", cfgReq.db, terrstr());
}
mndReleaseDb(pMnode, pDb);
return code;
}
static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pDb);
if (pRedoRaw == NULL) return -1;
......@@ -1509,6 +1582,23 @@ static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.ttl;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.singleSTable;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.streamMode;
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
char *status = "ready";
STR_WITH_SIZE_TO_VARSTR(pWrite, status, strlen(status));
cols++;
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
// *(int8_t *)pWrite = pDb->cfg.update;
}
......
......@@ -63,7 +63,11 @@ static const SInfosTableSchema userDBSchema[] = {
{.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "stream_mode", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
// {.name = "update", .bytes = 1, .type =
// TSDB_DATA_TYPE_TINYINT}, // disable update
};
......
......@@ -569,6 +569,44 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *dbFName, SDbCfgInfo *out) {
char *msg = NULL;
int32_t msgLen = 0;
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)]((void *)dbFName, &msg, 0, &msgLen);
if (code) {
ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
CTG_ERR_RET(code);
}
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GET_DB_CFG,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rpcRsp.code), dbFName);
CTG_ERR_RET(rpcRsp.code);
}
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)](out, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
ctgError("Process get db cfg rsp failed, code:%x, db:%s", code, dbFName);
CTG_ERR_RET(code);
}
ctgDebug("Got db cfg from mnode, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
if (NULL == pCtg->dbCache) {
*exist = 0;
......@@ -2137,7 +2175,6 @@ _return:
CTG_RET(code);
}
int32_t catalogInit(SCatalogCfg *cfg) {
if (gCtgMgmt.pCluster) {
qError("catalog already initialized");
......@@ -2717,6 +2754,15 @@ int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num)
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
}
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == dbFName || NULL == pDbCfg) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg));
}
void catalogDestroy(void) {
qInfo("start to destroy catalog");
......
......@@ -618,7 +618,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSortNode->pSortKeys->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......
......@@ -209,6 +209,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
assert(pDispatcher->queryEnd);
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->pSchema->precision;
pOutput->bufStatus = DS_BUF_EMPTY;
pOutput->queryEnd = pDispatcher->queryEnd;
return TSDB_CODE_SUCCESS;
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
......
......@@ -7289,8 +7289,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
}
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode->pExpr;
SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
SBlockOrderInfo bi = {0};
bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
......
......@@ -255,6 +255,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pOperator->getNextFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo;
static int32_t cost = 0;
pOperator->cost.openCost = ++cost;
pOperator->cost.totalCost = ++cost;
pOperator->resultInfo.totalRows = ++cost;
return pOperator;
}
......
......@@ -21,7 +21,7 @@
#include "taoserror.h"
#include "thash.h"
char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "&", "|", ">", ">=", "<", "<=", "=", "<>",
char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "-", "&", "|", ">", ">=", "<", "<=", "=", "<>",
"IN", "NOT IN", "LIKE", "NOT LIKE", "MATCH", "NMATCH", "IS NULL", "IS NOT NULL",
"IS TRUE", "IS FALSE", "IS UNKNOWN", "IS NOT TRUE", "IS NOT FALSE", "IS NOT UNKNOWN"};
char *gLogicConditionStr[] = {"AND", "OR", "NOT"};
......
......@@ -104,6 +104,8 @@ static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) {
}
pOp->opType = OP_TYPE_IS_TRUE;
pOp->pLeft = pSrc;
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
*pIsTrue = (SNode*)pOp;
return TSDB_CODE_SUCCESS;
}
......@@ -198,6 +200,8 @@ static int32_t calcConstQuery(SNode* pStmt) {
switch (nodeType(pStmt)) {
case QUERY_NODE_SELECT_STMT:
return calcConstSelect((SSelectStmt*)pStmt);
case QUERY_NODE_EXPLAIN_STMT:
return calcConstQuery(((SExplainStmt*)pStmt)->pQuery);
default:
break;
}
......
......@@ -1008,6 +1008,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
pReq->cacheLastRow = GET_OPTION_VAL(pStmt->pOptions->pCachelast, TSDB_DEFAULT_CACHE_LAST_ROW);
pReq->ignoreExist = pStmt->ignoreExists;
pReq->streamMode = GET_OPTION_VAL(pStmt->pOptions->pStreamMode, TSDB_DEFAULT_DB_STREAM_MODE_OPTION);
pReq->ttl = GET_OPTION_VAL(pStmt->pOptions->pTtl, TSDB_DEFAULT_DB_TTL_OPTION);
pReq->singleSTable = GET_OPTION_VAL(pStmt->pOptions->pSingleStable, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION);
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
}
......
......@@ -121,6 +121,23 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SDbCfgReq dbCfgReq = {0};
strcpy(dbCfgReq.db, input);
int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq);
void *pBuf = rpcMallocCont(bufLen);
tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
SUseDbOutput *pOut = output;
......@@ -309,17 +326,36 @@ PROCESS_QLIST_OVER:
return code;
}
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
SDbCfgRsp out = {0};
if (NULL == output || NULL == msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) {
qError("tDeserializeSDbCfgRsp failed, msgSize:%d", msgSize);
return TSDB_CODE_INVALID_MSG;
}
memcpy(output, &out, sizeof(out));
return TSDB_CODE_SUCCESS;
}
void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
}
#pragma GCC diagnostic pop
......@@ -977,10 +977,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
//QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
//QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
queryRsped = true;
//queryRsped = true;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
......@@ -994,10 +994,10 @@ _return:
input.code = code;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (!queryRsped) {
qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
//if (!queryRsped) {
// qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
//}
QW_RET(TSDB_CODE_SUCCESS);
}
......
......@@ -687,11 +687,15 @@ int32_t filterGetRangeRes(void* h, SFilterRange *ra) {
SFilterRangeNode* r = ctx->rs;
while (r) {
FILTER_COPY_RA(ra, &r->ra);
if (num) {
ra->e = r->ra.e;
ra->eflag = r->ra.eflag;
} else {
FILTER_COPY_RA(ra, &r->ra);
}
++num;
r = r->next;
++ra;
}
if (num == 0) {
......@@ -3314,8 +3318,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t
int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
SFilterInfo *info = NULL;
int32_t filterGetTimeRangeImpl(SFilterInfo *info, STimeWindow *win, bool *isStrict) {
SFilterRange ra = {0};
SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
......@@ -3369,13 +3372,14 @@ int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
*win = TSWINDOW_INITIALIZER;
} else {
filterGetRangeNum(prev, &num);
if (num > 1) {
qError("only one time range accepted, num:%d", num);
FLT_ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION);
}
FLT_CHK_JMP(num < 1);
if (num > 1) {
*isStrict = false;
qDebug("more than one time range, num:%d", num);
}
SFilterRange tra;
filterGetRangeRes(prev, &tra);
win->skey = tra.s;
......@@ -3401,6 +3405,30 @@ _return:
}
int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
SFilterInfo *info = NULL;
int32_t code = 0;
*isStrict = true;
FLT_ERR_RET(filterInitFromNode(pNode, &info, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP));
if (info->scalarMode) {
*win = TSWINDOW_INITIALIZER;
*isStrict = false;
goto _return;
}
FLT_ERR_JRET(filterGetTimeRangeImpl(info, win, isStrict));
_return:
filterFreeInfo(info);
FLT_RET(code);
}
int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar) {
if (FILTER_EMPTY_RES(info) || FILTER_ALL_RES(info)) {
return TSDB_CODE_SUCCESS;
......
......@@ -241,6 +241,7 @@ TEST(timerangeTest, greater) {
bool isStrict = false;
int32_t code = filterGetTimeRange(opNode1, &win, &isStrict);
ASSERT_EQ(code, 0);
ASSERT_EQ(isStrict, true);
ASSERT_EQ(win.skey, tsmall);
ASSERT_EQ(win.ekey, INT64_MAX);
//filterFreeInfo(filter);
......@@ -270,6 +271,7 @@ TEST(timerangeTest, greater_and_lower) {
STimeWindow win = {0};
bool isStrict = false;
int32_t code = filterGetTimeRange(logicNode, &win, &isStrict);
ASSERT_EQ(isStrict, true);
ASSERT_EQ(code, 0);
ASSERT_EQ(win.skey, tsmall);
ASSERT_EQ(win.ekey, tbig);
......@@ -277,6 +279,56 @@ TEST(timerangeTest, greater_and_lower) {
nodesDestroyNode(logicNode);
}
TEST(timerangeTest, greater_and_lower_not_strict) {
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL;
bool eRes[5] = {false, false, true, true, true};
SScalarParam res = {0};
int64_t tsmall1 = 222, tbig1 = 333;
int64_t tsmall2 = 444, tbig2 = 555;
SNode *list[2] = {0};
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall1);
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig1);
flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
list[0] = opNode1;
list[1] = opNode2;
flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_AND, list, 2);
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall2);
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig2);
flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
list[0] = opNode1;
list[1] = opNode2;
flttMakeLogicNode(&logicNode2, LOGIC_COND_TYPE_AND, list, 2);
list[0] = logicNode1;
list[1] = logicNode2;
flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_OR, list, 2);
//SFilterInfo *filter = NULL;
//int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
//ASSERT_EQ(code, 0);
STimeWindow win = {0};
bool isStrict = false;
int32_t code = filterGetTimeRange(logicNode1, &win, &isStrict);
ASSERT_EQ(isStrict, false);
ASSERT_EQ(code, 0);
ASSERT_EQ(win.skey, tsmall1);
ASSERT_EQ(win.ekey, tbig2);
//filterFreeInfo(filter);
nodesDestroyNode(logicNode1);
}
TEST(columnTest, smallint_column_greater_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5]= {1, 2, 3, 4, 5};
......
......@@ -1235,6 +1235,34 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS;
}
*pTask = *task;
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0);
nodeInfo->handle = handle;
return TSDB_CODE_SUCCESS;
}
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
......@@ -1247,22 +1275,25 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
}
int32_t s = taosHashGetSize(pJob->execTasks);
if (s <= 0) {
SCH_JOB_ELOG("empty execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask);
if (NULL == pTask) {
if (TDMT_VND_EXPLAIN_RSP == msgType) {
schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
} else {
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
if (NULL == pTask) {
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
pTask = *task;
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
SCH_SET_TASK_HANDLE(pTask, pMsg->handle);
schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode);
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
_return:
......
......@@ -130,9 +130,9 @@ static SCliThrdObj* createThrdObj();
static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
......@@ -154,20 +154,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} \
} while (0)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
do { \
int i = 0, sz = transQueueSize(&conn->cliMsgs); \
for (; i < sz; i++) { \
pMsg = transQueueGet(&conn->cliMsgs, i); \
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
do { \
int i = 0, sz = transQueueSize(&conn->cliMsgs); \
for (; i < sz; i++) { \
pMsg = transQueueGet(&conn->cliMsgs, i); \
if (pMsg != NULL && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
break; \
} \
} \
if (i == sz) { \
pMsg = NULL; \
} else { \
pMsg = transQueueRm(&conn->cliMsgs, i); \
} \
break; \
} \
} \
if (i == sz) { \
pMsg = NULL; \
} else { \
pMsg = transQueueRm(&conn->cliMsgs, i); \
} \
} while (0)
#define CONN_GET_NEXT_SENDMSG(conn) \
do { \
......@@ -209,8 +209,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
static void* cliWorkThread(void* arg);
......@@ -722,10 +722,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->hThrdIdx = pCtx->hThrdIdx;
transCtxMerge(&conn->ctx, &pCtx->appCtx);
if (!transQueuePush(&conn->cliMsgs, pMsg)) {
return;
}
transDestroyBuffer(&conn->readBuf);
transQueuePush(&conn->cliMsgs, pMsg);
// tTrace("%s cli conn %p queue msg size %d", ((STrans*)pThrd->pTransInst)->label, conn, 2);
// return;
//}
// transDestroyBuffer(&conn->readBuf);
cliSend(conn);
} else {
conn = cliCreateConn(pThrd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册