提交 d9c83f63 编写于 作者: dengyihao's avatar dengyihao

enh: change rpcGetConnInfo

上级 4e6010f5
......@@ -136,14 +136,14 @@ int indexRebuild(SIndex* index, SIndexOpts* opt);
* @param index (output, index json object)
* @return error code
*/
int tIndexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
int indexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
/*
* close index
* @param index (input, index to be closed)
* @return void
*/
void tIndexJsonClose(SIndexJson* index);
void indexJsonClose(SIndexJson* index);
/*
* insert terms into index
......@@ -152,7 +152,7 @@ void tIndexJsonClose(SIndexJson* index);
* @param uid (input, uid of terms)
* @return error code
*/
int tIndexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
int indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
/*
* search index
* @param index (input, index object)
......@@ -161,7 +161,7 @@ int tIndexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
* @return error code
*/
int tIndexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
int indexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
/*
* @param
* @param
......
......@@ -46,6 +46,7 @@ typedef struct SRpcHandleInfo {
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
SRpcConnInfo connInfo;
// app info
void *ahandle; // app handle set by client
void *wrapper; // wrapper handle
......
......@@ -22,17 +22,17 @@ static void dmSendRsp(SRpcMsg *pMsg);
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if (IsReq(pRpc) && rpcGetConnInfo(&pRpc->info, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
return -1;
}
SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo);
// if (IsReq(pRpc)) {
// terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
// dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
// return -1;
//}
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
memcpy(pMsg->conn.user, connInfo.user, TSDB_USER_LEN);
pMsg->conn.clientIp = connInfo.clientIp;
pMsg->conn.clientPort = connInfo.clientPort;
memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN);
pMsg->conn.clientIp = pConnInfo->clientIp;
pMsg->conn.clientPort = pConnInfo->clientPort;
return 0;
}
......
......@@ -99,7 +99,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
indexMultiTermAdd(terms, term);
}
}
tIndexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
indexMultiTermDestroy(terms);
#endif
return 0;
......
......@@ -131,7 +131,7 @@ typedef struct TFileCacheKey {
char* colName;
int32_t nColName;
} ICacheKey;
int indexFlushCacheToTFile(SIndex* sIdx, void*, bool quit);
int idxFlushCacheToTFile(SIndex* sIdx, void*, bool quit);
int64_t indexAddRef(void* p);
int32_t indexRemoveRef(int64_t ref);
......
......@@ -455,7 +455,7 @@ static void idxDestroyFinalRslt(SArray* result) {
taosArrayDestroy(result);
}
int indexFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
if (sIdx == NULL) {
return -1;
}
......
......@@ -69,7 +69,7 @@ static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRsl
cacheSearchRange_JSON}};
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);
static bool idxCacheIteratorNext(Iterate* itera);
static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
if (cache == NULL) {
......@@ -476,7 +476,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
iiter->val.colVal = NULL;
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
iiter->next = indexCacheIteratorNext;
iiter->next = idxCacheIteratorNext;
iiter->getValue = indexCacheIteratorGetValue;
taosThreadMutexUnlock(&cache->mtx);
......@@ -748,9 +748,9 @@ static void doMergeWork(SSchedMsg* msg) {
int quit = msg->thandle ? true : false;
taosMemoryFree(msg->thandle);
indexFlushCacheToTFile(sidx, pCache, quit);
idxFlushCacheToTFile(sidx, pCache, quit);
}
static bool indexCacheIteratorNext(Iterate* itera) {
static bool idxCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter;
if (iter == NULL) {
return false;
......
......@@ -355,7 +355,7 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
indexMultiTermQueryAdd(mtm, tm, qtype);
ret = tIndexJsonSearch(arg->ivtIdx, mtm, output->result);
ret = indexJsonSearch(arg->ivtIdx, mtm, output->result);
} else {
bool reverse;
Filter filterFunc = sifGetFilterFunc(qtype, &reverse);
......
......@@ -15,11 +15,11 @@
#include "index.h"
#include "indexInt.h"
int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
int indexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
// handle
return indexOpen(opts, path, index);
}
int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
if (p->colType == TSDB_DATA_TYPE_BOOL) {
......@@ -36,7 +36,7 @@ int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
return indexPut(index, terms, uid);
}
int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
int indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
SArray *terms = tq->query;
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
......@@ -54,7 +54,7 @@ int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *re
return indexSearch(index, tq, result);
}
void tIndexJsonClose(SIndexJson *index) {
void indexJsonClose(SIndexJson *index) {
// handle close
return indexClose(index);
}
......@@ -56,11 +56,11 @@ class JsonEnv : public ::testing::Test {
initLog();
opts = indexOptsCreate();
int ret = tIndexJsonOpen(opts, dir.c_str(), &index);
int ret = indexJsonOpen(opts, dir.c_str(), &index);
assert(ret == 0);
}
virtual void TearDown() {
tIndexJsonClose(index);
indexJsonClose(index);
indexOptsDestroy(opts);
printf("destory\n");
taosMsleep(1000);
......@@ -75,7 +75,7 @@ static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtyp
(const char*)data, dlen);
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, (int64_t)tableId);
indexJsonPut(index, terms, (int64_t)tableId);
indexMultiTermDestroy(terms);
}
......@@ -86,7 +86,7 @@ static void delData(SIndexJson* index, const std::string& colName, int8_t dtype,
(const char*)data, dlen);
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, (int64_t)tableId);
indexJsonPut(index, terms, (int64_t)tableId);
indexMultiTermDestroy(terms);
}
......@@ -99,7 +99,7 @@ static void Search(SIndexJson* index, const std::string& colNam, int8_t dtype, v
SArray* res = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, (EIndexQueryType)filterType);
tIndexJsonSearch(index, mq, res);
indexJsonSearch(index, mq, res);
indexMultiTermQueryDestroy(mq);
*result = res;
}
......@@ -112,7 +112,7 @@ TEST_F(JsonEnv, testWrite) {
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -125,7 +125,7 @@ TEST_F(JsonEnv, testWrite) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -138,7 +138,7 @@ TEST_F(JsonEnv, testWrite) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -152,7 +152,7 @@ TEST_F(JsonEnv, testWrite) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(100, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -167,7 +167,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -182,7 +182,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -196,7 +196,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -210,7 +210,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(10, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -225,7 +225,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -240,7 +240,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(10, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -258,7 +258,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -271,7 +271,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -284,7 +284,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -297,7 +297,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -310,7 +310,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -324,7 +324,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -339,7 +339,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -354,7 +354,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -369,7 +369,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -385,7 +385,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -398,7 +398,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -412,7 +412,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -426,7 +426,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -441,7 +441,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -455,7 +455,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -469,7 +469,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -483,7 +483,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i);
indexJsonPut(index, terms, i);
indexMultiTermDestroy(terms);
}
}
......@@ -498,7 +498,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......@@ -511,7 +511,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, i + 1000);
indexJsonPut(index, terms, i + 1000);
indexMultiTermDestroy(terms);
}
}
......@@ -526,7 +526,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
indexJsonSearch(index, mq, result);
EXPECT_EQ(2000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
......
......@@ -307,6 +307,13 @@ static void uvHandleReq(SSvrConn* pConn) {
if (pHead->noResp == 1) {
transMsg.info.refId = -1;
}
// set up conn info
SRpcConnInfo* pConnInfo = &(transMsg.info.connInfo);
pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr);
pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
transReleaseExHandle(refMgt, pConn->refId);
STrans* pTransInst = pConn->pTransInst;
......@@ -1153,34 +1160,6 @@ _return2:
rpcFreeCont(msg->pCont);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) {
if (thandle == NULL) {
tTrace("invalid handle %p, failed to Get Conn info", thandle);
return -1;
}
SRpcHandleInfo* pInfo = thandle;
SExHandle* exh = pInfo->handle;
int64_t refId = pInfo->refId;
ASYNC_CHECK_HANDLE(exh, refId);
// SExHandle* ex = thandle;
SSvrConn* pConn = exh->handle;
if (pConn == NULL) {
tTrace("invalid handle %p, failed to Get Conn info", thandle);
transReleaseExHandle(refMgt, refId);
return -1;
}
struct sockaddr_in addr = pConn->addr;
pConnInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
pConnInfo->clientPort = ntohs(addr.sin_port);
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
transReleaseExHandle(refMgt, refId);
return 0;
_return1:
transReleaseExHandle(refMgt, refId);
return -1;
_return2:
return -1;
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册