未验证 提交 040292cb 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #12718 from taosdata/enh/deleteDupCode

Enh/delete dup code
......@@ -63,11 +63,6 @@ typedef struct SRpcMsg {
} SRpcMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
///
// // SRpcMsg code
// REDIERE,
// NOT READY, EpSet
typedef bool (*RpcRfp)(int32_t code);
typedef struct SRpcInit {
......@@ -80,18 +75,11 @@ typedef struct SRpcInit {
int idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client app ecurity only
char *user; // user name
char spi; // security parameter index
char encrypt; // encrypt algorithm
char *secret; // key for authentication
char *ckey; // ciphering key
char *user; // user name
// call back to process incoming msg, code shall be ignored by server app
RpcCfp cfp;
// call back to retrieve the client auth info, for server app only
RpcAfp afp;
// user defined retry func
RpcRfp rfp;
......
......@@ -60,7 +60,7 @@ static void registerRequest(SRequestObj *pRequest) {
static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL);
STscObj *pTscObj = pRequest->pTscObj;
STscObj * pTscObj = pRequest->pTscObj;
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
......@@ -91,7 +91,6 @@ static bool clientRpcRfp(int32_t code) {
}
}
// TODO refactor
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit;
......@@ -105,10 +104,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = (char *)auth;
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
tscError("failed to init connection to server");
......@@ -318,7 +313,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return 0;
}
SConfig *pCfg = taosGetCfg();
SConfig * pCfg = taosGetCfg();
SConfigItem *pItem = NULL;
switch (option) {
......
......@@ -291,7 +291,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res);
pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
......@@ -325,7 +325,7 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList)
int32_t validateSversion(SRequestObj* pRequest, void* res) {
SArray* pArray = NULL;
int32_t code = 0;
if (TDMT_VND_SUBMIT == pRequest->type) {
SSubmitRsp* pRsp = (SSubmitRsp*)res;
if (pRsp->nBlocks <= 0) {
......@@ -337,14 +337,13 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
SSubmitBlkRsp *blk = pRsp->pBlocks + i;
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
SSubmitBlkRsp* blk = pRsp->pBlocks + i;
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
taosArrayPush(pArray, &tbSver);
}
} else if (TDMT_VND_QUERY == pRequest->type) {
}
SCatalog* pCatalog = NULL;
......@@ -365,11 +364,10 @@ void freeRequestRes(SRequestObj* pRequest, void* res) {
if (NULL == res) {
return;
}
if (TDMT_VND_SUBMIT == pRequest->type) {
tFreeSSubmitRsp((SSubmitRsp*)res);
} else if (TDMT_VND_QUERY == pRequest->type) {
}
}
......@@ -1022,7 +1020,6 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
SRpcInit rpcInit = {0};
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t*)("_pwd"), strlen("_pwd"), pass);
rpcInit.label = "CHK";
rpcInit.numOfThreads = 1;
rpcInit.cfp = NULL;
......@@ -1030,9 +1027,6 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "_dnd";
rpcInit.ckey = "_key";
rpcInit.spi = 1;
rpcInit.secret = pass;
clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) {
......
......@@ -49,9 +49,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
SDnodeTrans * pTrans = &pDnode->trans;
int32_t code = -1;
SRpcMsg *pMsg = NULL;
SRpcMsg * pMsg = NULL;
bool needRelease = false;
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
SMgmtWrapper *pWrapper = NULL;
......@@ -179,11 +179,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SArray *pArray = (*pWrapper->func.getHandlesFp)();
SArray * pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId;
......@@ -276,15 +276,9 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = INTERNAL_USER;
rpcInit.ckey = INTERNAL_CKEY;
rpcInit.spi = 1;
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
rpcInit.secret = pass;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client");
......
......@@ -48,10 +48,10 @@ void TestClient::DoInit() {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000;
rpcInit.user = (char*)this->user;
rpcInit.ckey = (char*)"key";
// rpcInit.ckey = (char*)"key";
rpcInit.parent = this;
rpcInit.secret = (char*)secretEncrypt;
rpcInit.spi = 1;
// rpcInit.secret = (char*)secretEncrypt;
// rpcInit.spi = 1;
clientRpc = rpcOpen(&rpcInit);
ASSERT(clientRpc);
......
......@@ -27,16 +27,16 @@
#include "trpc.h"
typedef struct SUdfdContext {
uv_loop_t *loop;
uv_loop_t * loop;
uv_pipe_t ctrlPipe;
uv_signal_t intrSignal;
char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_pipe_t listeningPipe;
void *clientRpc;
void * clientRpc;
SCorEpSet mgmtEp;
uv_mutex_t udfsMutex;
SHashObj *udfsHash;
SHashObj * udfsHash;
bool printVersion;
} SUdfdContext;
......@@ -45,7 +45,7 @@ SUdfdContext global;
typedef struct SUdfdUvConn {
uv_stream_t *client;
char *inputBuf;
char * inputBuf;
int32_t inputLen;
int32_t inputCap;
int32_t inputTotal;
......@@ -65,25 +65,25 @@ typedef struct SUdf {
uv_mutex_t lock;
uv_cond_t condReady;
char name[TSDB_FUNC_NAME_LEN];
int8_t funcType;
int8_t scriptType;
int8_t outputType;
char name[TSDB_FUNC_NAME_LEN];
int8_t funcType;
int8_t scriptType;
int8_t outputType;
int32_t outputLen;
int32_t bufSize;
char path[PATH_MAX];
char path[PATH_MAX];
uv_lib_t lib;
uv_lib_t lib;
TUdfScalarProcFunc scalarProcFunc;
TUdfScalarProcFunc scalarProcFunc;
TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc;
TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc;
TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc;
TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc;
} SUdf;
// TODO: add private udf structure.
......@@ -98,9 +98,9 @@ typedef enum EUdfdRpcReqRspType {
typedef struct SUdfdRpcSendRecvInfo {
EUdfdRpcReqRspType rpcType;
int32_t code;
void* param;
uv_sem_t resultSem;
int32_t code;
void * param;
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
......@@ -136,7 +136,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf* udf = msgInfo->param;
SUdf * udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->outputType;
......@@ -145,7 +145,8 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
TdFilePtr file =
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
......@@ -168,11 +169,11 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
void * pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
msgInfo->param = udf;
uv_sem_init(&msgInfo->resultSem, 0);
......@@ -194,7 +195,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
int32_t udfdConnectToMnode() {
SConnectReq connReq = {0};
connReq.connType = CONN_TYPE__UDFD;
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
tstrncpy(connReq.app, "udfd", sizeof(connReq.app));
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
......@@ -203,7 +204,7 @@ int32_t udfdConnectToMnode() {
connReq.startTime = htobe64(taosGetTimestampMs());
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void* pReq = rpcMallocCont(contLen);
void * pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
......@@ -240,17 +241,17 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
}
char initFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *initSuffix = "_init";
strcpy(initFuncName, udfName);
strncat(initFuncName, initSuffix, strlen(initSuffix));
uv_dlsym(&udf->lib, initFuncName, (void**)(&udf->initFunc));
uv_dlsym(&udf->lib, initFuncName, (void **)(&udf->initFunc));
char destroyFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *destroySuffix = "_destroy";
strcpy(destroyFuncName, udfName);
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
uv_dlsym(&udf->lib, destroyFuncName, (void**)(&udf->destroyFunc));
uv_dlsym(&udf->lib, destroyFuncName, (void **)(&udf->destroyFunc));
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
......@@ -270,87 +271,86 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
//TODO: merge
// TODO: merge
}
return 0;
}
void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo( "setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS;
SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
if (udfInHash) {
++(*udfInHash)->refCount;
udf = *udfInHash;
uv_mutex_unlock(&global.udfsMutex);
} else {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udf = udfNew;
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
uv_mutex_unlock(&global.udfsMutex);
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS;
SUdf * udf = NULL;
uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
if (udfInHash) {
++(*udfInHash)->refCount;
udf = *udfInHash;
uv_mutex_unlock(&global.udfsMutex);
} else {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udf = udfNew;
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
uv_mutex_unlock(&global.udfsMutex);
}
uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING;
code = udfdLoadUdf(setup->udfName, udf);
if (udf->initFunc) {
udf->initFunc();
}
uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING;
code = udfdLoadUdf(setup->udfName, udf);
if (udf->initFunc) {
udf->initFunc();
}
udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock);
} else {
while (udf->state != UDF_STATE_READY) {
uv_cond_wait(&udf->condReady, &udf->lock);
}
uv_mutex_unlock(&udf->lock);
udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock);
} else {
while (udf->state != UDF_STATE_READY) {
uv_cond_wait(&udf->condReady, &udf->lock);
}
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf;
SUdfResponse rsp;
rsp.seqNum = request->seqNum;
rsp.type = request->type;
rsp.code = code;
rsp.setupRsp.udfHandle = (int64_t)(handle);
rsp.setupRsp.outputType = udf->outputType;
rsp.setupRsp.outputLen = udf->outputLen;
rsp.setupRsp.bufSize = udf->bufSize;
int32_t len = encodeUdfResponse(NULL, &rsp);
rsp.msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, &rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
return;
uv_mutex_unlock(&udf->lock);
}
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf;
SUdfResponse rsp;
rsp.seqNum = request->seqNum;
rsp.type = request->type;
rsp.code = code;
rsp.setupRsp.udfHandle = (int64_t)(handle);
rsp.setupRsp.outputType = udf->outputType;
rsp.setupRsp.outputLen = udf->outputLen;
rsp.setupRsp.bufSize = udf->bufSize;
int32_t len = encodeUdfResponse(NULL, &rsp);
rsp.msgLen = len;
void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin;
encodeUdfResponse(&buf, &rsp);
uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base);
return;
}
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfCallRequest *call = &request->call;
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType,
call->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf *udf = handle->udf;
SUdfResponse response = {0};
SUdfResponse *rsp = &response;
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle);
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf * udf = handle->udf;
SUdfResponse response = {0};
SUdfResponse * rsp = &response;
SUdfCallResponse *subRsp = &rsp->callRsp;
int32_t code = TSDB_CODE_SUCCESS;
switch(call->callType) {
switch (call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
SUdfColumn output = {0};
......@@ -363,9 +363,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break;
}
case TSDB_UDF_CALL_AGG_INIT: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
.bufLen= udf->bufSize,
.numOfResult = 0};
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
udf->aggStartFunc(&outBuf);
subRsp->resultBuf = outBuf;
break;
......@@ -373,9 +371,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
case TSDB_UDF_CALL_AGG_PROC: {
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
.bufLen= udf->bufSize,
.numOfResult = 0};
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
freeUdfInterBuf(&call->interBuf);
freeUdfDataDataBlock(&input);
......@@ -384,9 +380,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
.bufLen= udf->bufSize,
.numOfResult = 0};
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
freeUdfInterBuf(&call->interBuf);
subRsp->resultBuf = outBuf;
......@@ -429,20 +423,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
default:
break;
}
taosMemoryFree(uvUdf->input.base);
return;
}
void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) {
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfTeardownRequest *teardown = &request->teardown;
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf *udf = handle->udf;
bool unloadUdf = false;
int32_t code = TSDB_CODE_SUCCESS;
SUdf * udf = handle->udf;
bool unloadUdf = false;
int32_t code = TSDB_CODE_SUCCESS;
uv_mutex_lock(&global.udfsMutex);
udf->refCount--;
......@@ -568,7 +561,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
}
void udfdHandleRequest(SUdfdUvConn *conn) {
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
uv_work_t * work = taosMemoryMalloc(sizeof(uv_work_t));
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
udfWork->client = conn->client;
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
......@@ -653,11 +646,11 @@ static bool udfdRpcRfp(int32_t code) {
}
}
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
pEpSet->version = 0;
// init mnode ip set
SEpSet* mgmtEpSet = &(pEpSet->epSet);
SEpSet *mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
......@@ -694,7 +687,6 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
return 0;
}
int32_t udfdOpenClientRpc() {
SRpcInit rpcInit = {0};
rpcInit.label = "UDFD";
......@@ -704,15 +696,9 @@ int32_t udfdOpenClientRpc() {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
rpcInit.secret = pass;
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
......@@ -823,7 +809,7 @@ static int32_t udfdUvInit() {
return 0;
}
static void udfdCloseWalkCb(uv_handle_t* handle, void* arg) {
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
if (!uv_is_closing(handle)) {
uv_close(handle, NULL);
}
......@@ -883,7 +869,7 @@ int main(int argc, char *argv[]) {
int32_t retryMnodeTimes = 0;
int32_t code = 0;
while (retryMnodeTimes++ < TSDB_MAX_REPLICA) {
uv_sleep(500 * ( 1 << retryMnodeTimes));
uv_sleep(500 * (1 << retryMnodeTimes));
code = udfdConnectToMnode();
if (code == 0) {
break;
......
......@@ -99,7 +99,7 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
if (fstSliceIsEmpty(s)) {
return;
}
size_t sz = taosArrayGetSize(nodes->stack) - 1;
int32_t sz = taosArrayGetSize(nodes->stack) - 1;
FstBuilderNodeUnfinished* un = taosArrayGet(nodes->stack, sz);
assert(un->last == NULL);
......@@ -130,11 +130,11 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes* node, FstSlice bs) {
FstSlice* s = &bs;
size_t ssz = taosArrayGetSize(node->stack); // stack size
int32_t ssz = taosArrayGetSize(node->stack); // stack size
uint64_t count = 0;
int32_t lsz; // data len
uint8_t* data = fstSliceData(s, &lsz);
for (size_t i = 0; i < ssz && i < lsz; i++) {
for (int32_t i = 0; i < ssz && i < lsz; i++) {
FstBuilderNodeUnfinished* un = taosArrayGet(node->stack, i);
if (un->last->inp == data[i]) {
count++;
......@@ -147,8 +147,8 @@ uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes* node, FstSlice bs)
uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes* node, FstSlice bs, Output in, Output* out) {
FstSlice* s = &bs;
size_t lsz = (size_t)(s->end - s->start + 1); // data len
size_t ssz = taosArrayGetSize(node->stack); // stack size
int32_t lsz = (size_t)(s->end - s->start + 1); // data len
int32_t ssz = taosArrayGetSize(node->stack); // stack size
*out = in;
uint64_t i = 0;
for (i = 0; i < lsz && i < ssz; i++) {
......@@ -245,7 +245,7 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran
return;
}
void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuilderNode* node) {
size_t sz = taosArrayGetSize(node->trans);
int32_t sz = taosArrayGetSize(node->trans);
assert(sz <= 256);
uint8_t tSize = 0;
......@@ -253,7 +253,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
// finalOutput.is_zero()
bool anyOuts = (node->finalOutput != 0);
for (size_t i = 0; i < sz; i++) {
for (int32_t i = 0; i < sz; i++) {
FstTransition* t = taosArrayGet(node->trans, i);
tSize = TMAX(tSize, packDeltaSize(addr, t->addr));
oSize = TMAX(oSize, packSize(t->out));
......@@ -301,7 +301,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
/// for (uint8_t i = 0; i < 256; i++) {
// index[i] = 255;
///}
for (size_t i = 0; i < sz; i++) {
for (int32_t i = 0; i < sz; i++) {
FstTransition* t = taosArrayGet(node->trans, i);
index[t->inp] = i;
// fstPackDeltaIn(w, addr, t->addr, tSize);
......@@ -731,7 +731,7 @@ bool fstNodeFindInput(FstNode* node, uint8_t b, uint64_t* res) {
}
bool fstNodeCompile(FstNode* node, void* w, CompiledAddr lastAddr, CompiledAddr addr, FstBuilderNode* builderNode) {
size_t sz = taosArrayGetSize(builderNode->trans);
int32_t sz = taosArrayGetSize(builderNode->trans);
assert(sz < 256);
if (sz == 0 && builderNode->isFinal && builderNode->finalOutput == 0) {
return true;
......@@ -959,8 +959,8 @@ void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished* unNode, O
if (FST_BUILDER_NODE_IS_FINAL(unNode->node)) {
unNode->node->finalOutput += out;
}
size_t sz = taosArrayGetSize(unNode->node->trans);
for (size_t i = 0; i < sz; i++) {
int32_t sz = taosArrayGetSize(unNode->node->trans);
for (int32_t i = 0; i < sz; i++) {
FstTransition* trn = taosArrayGet(unNode->node->trans, i);
trn->out += out;
}
......@@ -1077,7 +1077,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
tOut = tOut + FST_NODE_FINAL_OUTPUT(root);
}
for (size_t i = 0; i < taosArrayGetSize(nodes); i++) {
for (int32_t i = 0; i < taosArrayGetSize(nodes); i++) {
FstNode** node = (FstNode**)taosArrayGet(nodes, i);
fstNodeDestroy(*node);
}
......@@ -1352,7 +1352,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
StreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState};
taosArrayPush(sws->stack, &s2);
size_t isz = taosArrayGetSize(sws->inp);
int32_t isz = taosArrayGetSize(sws->inp);
uint8_t* buf = (uint8_t*)taosMemoryMalloc(isz * sizeof(uint8_t));
for (uint32_t i = 0; i < isz; i++) {
buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i);
......
......@@ -116,7 +116,7 @@ TFileCache* tfileCacheCreate(const char* path) {
continue;
}
TFileHeader* header = &reader->header;
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = (int32_t)strlen(header->colName)};
char buf[128] = {0};
int32_t sz = indexSerialCacheKey(&key, buf);
......@@ -230,7 +230,7 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
tem->suid, tem->colName, tem->colVal, cost);
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
ret = tfileReaderLoadTableIds((TFileReader*)reader, (int32_t)offset, tr->total);
cost = taosGetTimestampUs() - et;
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
tem->colName, tem->colVal, cost);
......@@ -890,7 +890,7 @@ static int tfileWriteFooter(TFileWriter* write) {
char buf[sizeof(tfileMagicNumber) + 1] = {0};
void* pBuf = (void*)buf;
taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber);
int nwrite = write->ctx->write(write->ctx, buf, strlen(buf));
int nwrite = write->ctx->write(write->ctx, buf, (int32_t)strlen(buf));
indexInfo("tfile write footer size: %d", write->ctx->size(write->ctx));
assert(nwrite == sizeof(tfileMagicNumber));
......
......@@ -37,14 +37,14 @@ static int iBinarySearch(SArray *arr, int s, int e, uint64_t k) {
}
void iIntersection(SArray *inters, SArray *final) {
int32_t sz = taosArrayGetSize(inters);
int32_t sz = (int32_t)taosArrayGetSize(inters);
if (sz <= 0) {
return;
}
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
for (int i = 0; i < sz; i++) {
SArray *t = taosArrayGetP(inters, i);
mi[i].len = taosArrayGetSize(t);
mi[i].len = (int32_t)taosArrayGetSize(t);
mi[i].idx = 0;
}
......@@ -70,7 +70,7 @@ void iIntersection(SArray *inters, SArray *final) {
taosMemoryFreeClear(mi);
}
void iUnion(SArray *inters, SArray *final) {
int32_t sz = taosArrayGetSize(inters);
int32_t sz = (int32_t)taosArrayGetSize(inters);
if (sz <= 0) {
return;
}
......@@ -82,7 +82,7 @@ void iUnion(SArray *inters, SArray *final) {
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
for (int i = 0; i < sz; i++) {
SArray *t = taosArrayGetP(inters, i);
mi[i].len = taosArrayGetSize(t);
mi[i].len = (int32_t)taosArrayGetSize(t);
mi[i].idx = 0;
}
while (1) {
......@@ -117,8 +117,8 @@ void iUnion(SArray *inters, SArray *final) {
}
void iExcept(SArray *total, SArray *except) {
int32_t tsz = taosArrayGetSize(total);
int32_t esz = taosArrayGetSize(except);
int32_t tsz = (int32_t)taosArrayGetSize(total);
int32_t esz = (int32_t)taosArrayGetSize(except);
if (esz == 0 || tsz == 0) {
return;
}
......@@ -141,7 +141,10 @@ int uidCompare(const void *a, const void *b) {
// add more version compare
uint64_t u1 = *(uint64_t *)a;
uint64_t u2 = *(uint64_t *)b;
return u1 - u2;
if (u1 == u2) {
return 0;
}
return u1 < u2 ? -1 : 1;
}
int verdataCompare(const void *a, const void *b) {
SIdxVerdata *va = (SIdxVerdata *)a;
......
......@@ -48,7 +48,7 @@ class FstWriter {
class FstReadMemory {
public:
FstReadMemory(size_t size, const std::string& fileName = "/tmp/tindex.tindex") {
FstReadMemory(int32_t size, const std::string& fileName = "/tmp/tindex.tindex") {
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
_w = fstCountingWriterCreate(_wc);
_size = size;
......@@ -152,7 +152,7 @@ class FstReadMemory {
Fst* _fst;
FstSlice _s;
WriterCtx* _wc;
size_t _size;
int32_t _size;
};
#define L 100
......
......@@ -183,9 +183,6 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = "sync-io";
rpcInit.secret = "sync-io";
rpcInit.ckey = "key";
rpcInit.spi = 0;
rpcInit.connType = TAOS_CONN_CLIENT;
io->clientRpc = rpcOpen(&rpcInit);
......@@ -206,7 +203,6 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
rpcInit.cfp = syncIOProcessRequest;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500;
rpcInit.afp = syncIOAuth;
rpcInit.parent = io;
rpcInit.connType = TAOS_CONN_SERVER;
......
......@@ -52,23 +52,15 @@ typedef struct {
int idleTime; // milliseconds;
uint16_t localPort;
int8_t connType;
int64_t index;
char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char secret[TSDB_PASSWORD_LEN]; // secret for the link
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
char user[TSDB_UNI_LEN]; // meter ID
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code);
int index;
int32_t refCount;
void* parent;
void* idPool; // handle to ID pool
void* tmrCtrl; // handle to timer
SHashObj* hash; // handle returned by hash utility
void* tcphandle; // returned handle from TCP initialization
TdThreadMutex mutex;
} SRpcInfo;
......
......@@ -69,9 +69,6 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user));
}
if (pInit->secret) {
memcpy(pRpc->secret, pInit->secret, strlen(pInit->secret));
}
return pRpc;
}
void rpcClose(void* arg) {
......
......@@ -134,7 +134,6 @@ int main(int argc, char *argv[]) {
rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500;
rpcInit.afp = retrieveAuthInfo;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
......
......@@ -118,9 +118,6 @@ int main(int argc, char *argv[]) {
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = "michael";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcDebugFlag = 131;
......@@ -144,9 +141,7 @@ int main(int argc, char *argv[]) {
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
......@@ -160,8 +155,6 @@ int main(int argc, char *argv[]) {
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
......
......@@ -123,7 +123,6 @@ int main(int argc, char *argv[]) {
rpcInit.cfp = processRequestMsg;
rpcInit.sessions = 1000;
rpcInit.idleTime = 2 * 1500;
rpcInit.afp = retrieveAuthInfo;
rpcDebugFlag = 131;
......
......@@ -21,15 +21,15 @@
#include "tutil.h"
typedef struct {
int index;
SEpSet epSet;
int num;
int numOfReqs;
int msgSize;
tsem_t rspSem;
tsem_t * pOverSem;
int index;
SEpSet epSet;
int num;
int numOfReqs;
int msgSize;
tsem_t rspSem;
tsem_t * pOverSem;
TdThread thread;
void * pRpc;
void * pRpc;
} SInfo;
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->info.ahandle;
......@@ -103,7 +103,7 @@ int main(int argc, char *argv[]) {
char secret[20] = "mypassword";
struct timeval systemTime;
int64_t startTime, endTime;
TdThreadAttr thattr;
TdThreadAttr thattr;
// server info
epSet.inUse = 0;
......@@ -119,9 +119,6 @@ int main(int argc, char *argv[]) {
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = "michael";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i = 1; i < argc; ++i) {
......@@ -144,9 +141,7 @@ int main(int argc, char *argv[]) {
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
......@@ -160,8 +155,6 @@ int main(int argc, char *argv[]) {
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
......
......@@ -50,9 +50,6 @@ class Client {
rpcInit_.numOfThreads = nThread;
rpcInit_.cfp = processResp;
rpcInit_.user = (char *)user;
rpcInit_.secret = (char *)secret;
rpcInit_.ckey = (char *)ckey;
rpcInit_.spi = 1;
rpcInit_.parent = this;
rpcInit_.connType = TAOS_CONN_CLIENT;
this->transCli = rpcOpen(&rpcInit_);
......@@ -117,9 +114,6 @@ class Server {
rpcInit_.numOfThreads = 5;
rpcInit_.cfp = processReq;
rpcInit_.user = (char *)user;
rpcInit_.secret = (char *)secret;
rpcInit_.ckey = (char *)ckey;
rpcInit_.spi = 1;
rpcInit_.connType = TAOS_CONN_SERVER;
}
void Start() {
......
......@@ -31,9 +31,6 @@ static void shellWorkAsClient() {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "_dnd";
rpcInit.ckey = "_key";
rpcInit.spi = 1;
rpcInit.secret = pass;
clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册