提交 41fb517f 编写于 作者: H Haojun Liao

[td-11818] merge 3.0

......@@ -172,7 +172,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp)
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
......
......@@ -48,7 +48,6 @@ typedef struct SOutputData {
int8_t compressed;
char* pData;
bool queryEnd;
int32_t scheduleJobNo;
int32_t bufStatus;
int64_t useconds;
int8_t precision;
......
......@@ -29,8 +29,6 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_SCHEDULE_DATA_SINK:
return qWorkerProcessDataSinkMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......
......@@ -196,7 +196,6 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput->bufStatus = updateStatus(pDispatcher);
pthread_mutex_lock(&pDispatcher->mutex);
pOutput->queryEnd = pDispatcher->queryEnd;
pOutput->scheduleJobNo = 0;
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->schema.precision;
pthread_mutex_unlock(&pDispatcher->mutex);
......
......@@ -31,8 +31,6 @@ enum {
QW_PHASE_POST_QUERY,
QW_PHASE_PRE_CQUERY,
QW_PHASE_POST_CQUERY,
QW_PHASE_PRE_SINK,
QW_PHASE_POST_SINK,
QW_PHASE_PRE_FETCH,
QW_PHASE_POST_FETCH,
};
......@@ -105,10 +103,12 @@ typedef struct SQWTaskStatus {
typedef struct SQWTaskCtx {
SRWLatch lock;
int32_t phase;
int32_t sinkId;
int32_t readyCode;
int8_t phase;
bool emptyRes;
int8_t queryContinue;
int8_t inQueue;
int32_t rspCode;
int8_t events[QW_EVENT_MAX];
......@@ -144,7 +144,11 @@ typedef struct SQWorkerMgmt {
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_IN_EXECUTOR(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_FETCH)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
......
此差异已折叠。
......@@ -229,42 +229,6 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
......@@ -366,25 +330,6 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SSinkDataReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid sink data msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
//dsScheduleProcess();
//TODO
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
......
......@@ -102,38 +102,110 @@ typedef void* queue[2];
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
int32_t contLen; // content length
int32_t code; // error code
int16_t numOfTry; // number of try for different servers
int8_t oldInUse; // server EP inUse passed by app
int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
SEpSet* pSet; // for synchronous API
char msg[0]; // RpcHead starts from here
SRpcInfo* pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
// struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
int32_t contLen; // content length
// int32_t code; // error code
// int16_t numOfTry; // number of try for different servers
// int8_t oldInUse; // server EP inUse passed by app
// int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
char* ip;
uint32_t port;
// SEpSet* pSet; // for synchronous API
} SRpcReqContext;
typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
// struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
int32_t contLen; // content length
// int32_t code; // error code
// int16_t numOfTry; // number of try for different servers
// int8_t oldInUse; // server EP inUse passed by app
// int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
char* ip;
uint32_t port;
// SEpSet* pSet; // for synchronous API
} STransConnCtx;
#pragma pack(push, 1)
typedef struct {
char version : 4; // RPC version
char comp : 4; // compression algorithm, 0:no compression 1:lz4
char resflag : 2; // reserved bits
char spi : 3; // security parameter index
char encrypt : 3; // encrypt algorithm, 0: no encryption
uint32_t code; // del later
uint32_t msgType;
int32_t msgLen;
uint8_t content[0]; // message body starts from here
} STransMsgHead;
typedef struct {
int32_t reserved;
int32_t contLen;
} STransCompMsg;
typedef struct {
uint32_t timeStamp;
uint8_t auth[TSDB_AUTH_LEN];
} STransDigestMsg;
#pragma pack(pop)
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead) + sizeof(STransDigestMsg))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void transConnCtxDestroy(STransConnCtx* ctx);
typedef struct SConnBuffer {
char* buf;
int len;
int cap;
int left;
} SConnBuffer;
#endif
......@@ -45,6 +45,9 @@ extern "C" {
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void taosCloseServer(void* arg);
void taosCloseClient(void* arg);
typedef struct {
int sessions; // number of sessions allowed
int numOfThreads; // number of threads to process incoming messages
......
......@@ -17,15 +17,9 @@
#include "transComm.h"
typedef struct SConnBuffer {
char* buf;
int len;
int cap;
int left;
} SConnBuffer;
void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
taosInitServer, taosInitClient};
void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient};
void* rpcOpen(const SRpcInit* pInit) {
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
......@@ -38,13 +32,18 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->cfp = pInit->cfp;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->connType = pInit->connType;
pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
return pRpc;
}
void rpcClose(void* arg) { return; }
void rpcClose(void* arg) {
SRpcInfo* pRpc = (SRpcInfo*)arg;
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
free(pRpc);
return;
}
void* rpcMallocCont(int contLen) {
int size = contLen + RPC_MSG_OVERHEAD;
int size = contLen + TRANS_MSG_OVERHEAD;
char* start = (char*)calloc(1, (size_t)size);
if (start == NULL) {
......@@ -53,7 +52,7 @@ void* rpcMallocCont(int contLen) {
} else {
tTrace("malloc mem:%p size:%d", start, size);
}
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
return start + sizeof(STransMsgHead);
}
void rpcFreeCont(void* cont) { return; }
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
......
......@@ -21,15 +21,18 @@ typedef struct SCliConn {
uv_connect_t connReq;
uv_stream_t* stream;
uv_write_t* writeReq;
SConnBuffer readBuf;
void* data;
queue conn;
char spi;
char secured;
} SCliConn;
typedef struct SCliMsg {
SRpcReqContext* context;
queue q;
uint64_t st;
STransConnCtx* ctx;
SRpcMsg msg;
queue q;
uint64_t st;
} SCliMsg;
typedef struct SCliThrdObj {
......@@ -53,27 +56,92 @@ typedef struct SClientObj {
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port);
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn);
static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// process data read from server, auth/decompress etc
static void clientProcessData(SCliConn* conn);
// check whether already read complete packet from server
static bool clientReadComplete(SConnBuffer* pBuf);
// alloc buf for read
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after write data to socket
static void clientWriteCb(uv_write_t* req, int status);
// callback after conn to server
static void clientConnCb(uv_connect_t* req, int status);
static void clientAsyncCb(uv_async_t* handle);
static void clientDestroy(uv_handle_t* handle);
static void clientConnDestroy(SCliConn* pConn);
static void clientMsgDestroy(SCliMsg* pMsg);
static void* clientThread(void* arg);
static void clientProcessData(SCliConn* conn) {
// impl
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
static bool clientReadComplete(SConnBuffer* data) {
STransMsgHead head;
int32_t headLen = sizeof(head);
if (data->len >= headLen) {
memcpy((char*)&head, data->buf, headLen);
int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
if (msgLen > data->len) {
data->left = msgLen - data->len;
return false;
} else {
return true;
}
} else {
return false;
}
}
static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
// impl later
static const int CAPACITY = 512;
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf;
buf->len = CAPACITY;
} else {
if (pBuf->len >= pBuf->cap) {
if (pBuf->left == -1) {
pBuf->cap *= 2;
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
}
}
buf->base = pBuf->buf + pBuf->len;
buf->len = pBuf->cap - pBuf->len;
}
}
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later
SCliConn* conn = handle->data;
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
pBuf->len += nread;
if (clientReadComplete(pBuf)) {
tDebug("alread read complete pack");
clientProcessData(conn);
} else {
tDebug("read halp packet, continue to read");
}
return;
}
if (nread != UV_EOF) {
tDebug("Read error %s\n", uv_err_name(nread));
}
//
uv_close((uv_handle_t*)handle, clientDestroy);
}
......@@ -96,15 +164,17 @@ static void clientWriteCb(uv_write_t* req, int status) {
return;
}
uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb);
uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb);
// impl later
}
static void clientWrite(SCliConn* pConn) {
SCliMsg* pMsg = pConn->data;
SRpcHead* pHead = rpcHeadFromCont(pMsg->context->pCont);
int msgLen = rpcMsgLenFromCont(pMsg->context->contLen);
char* msg = (char*)(pHead);
SCliMsg* pCliMsg = pConn->data;
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
int msgLen = transMsgLenFromCont(pMsg->contLen);
char* msg = (char*)(pHead);
uv_buf_t wb = uv_buf_init(msg, msgLen);
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
......@@ -118,23 +188,22 @@ static void clientConnCb(uv_connect_t* req, int status) {
return;
}
SCliMsg* pMsg = pConn->data;
SEpSet* pEpSet = &pMsg->context->epSet;
SRpcMsg rpcMsg;
// rpcMsg.ahandle = pMsg->context->ahandle;
// rpcMsg.pCont = NULL;
SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = ((SCliMsg*)(pConn->data))->ctx;
SRpcMsg rpcMsg;
rpcMsg.ahandle = pCtx->ahandle;
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
uint32_t port = pEpSet->port[pEpSet->inUse];
if (status != 0) {
// call user fp later
tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status));
SRpcInfo* pRpc = pMsg->context->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, pEpSet);
tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
uv_close((uv_handle_t*)req->handle, clientDestroy);
return;
}
assert(pConn->stream == req->handle);
clientWrite(pConn);
}
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
......@@ -147,24 +216,20 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn)
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SEpSet* pEpSet = &pMsg->context->epSet;
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
uint32_t port = pEpSet->port[pEpSet->inUse];
uint64_t el = taosGetTimestampUs() - pMsg->st;
uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st;
tDebug("msg tran time cost: %" PRIu64 "", el);
et = taosGetTimestampUs();
SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port);
STransConnCtx* pCtx = pMsg->ctx;
SCliConn* conn = getConnFromCache(pThrd->cache, pCtx->ip, pCtx->port);
if (conn != NULL) {
// impl later
conn->data = pMsg;
conn->writeReq->data = conn;
clientWrite(conn);
// uv_buf_t wb;
// uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb);
} else {
SCliConn* conn = malloc(sizeof(SCliConn));
SCliConn* conn = calloc(1, sizeof(SCliConn));
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
......@@ -172,23 +237,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->connReq.data = conn;
conn->data = pMsg;
struct sockaddr_in addr;
uv_ip4_addr(fqdn, port, &addr);
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
// handle error in callback if fail to connect
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
// SRpcMsg rpcMsg;
// SEpSet* pEpSet = &pMsg->context->epSet;
// SRpcInfo* pRpc = pMsg->context->pRpc;
//// rpcMsg.ahandle = pMsg->context->ahandle;
// rpcMsg.pCont = NULL;
// rpcMsg.ahandle = pMsg->context->ahandle;
// uint64_t el1 = taosGetTimestampUs() - et;
// tError("msg tran back first: time cost: %" PRIu64 "", el1);
// et = taosGetTimestampUs();
//(pRpc->cfp)(NULL, &rpcMsg, pEpSet);
// uint64_t el2 = taosGetTimestampUs() - et;
// tError("msg tran back second: time cost: %" PRIu64 "", el2);
}
}
static void clientAsyncCb(uv_async_t* handle) {
......@@ -205,7 +258,8 @@ static void clientAsyncCb(uv_async_t* handle) {
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
pMsg = QUEUE_DATA(h, SCliMsg, q);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
clientHandleReq(pMsg, pThrd);
count++;
if (count >= 2) {
......@@ -221,6 +275,7 @@ static void* clientThread(void* arg) {
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SClientObj* cli = calloc(1, sizeof(SClientObj));
memcpy(cli->label, label, strlen(label));
cli->numOfThreads = numOfThreads;
cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
......@@ -245,22 +300,44 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
}
return cli;
}
static void clientMsgDestroy(SCliMsg* pMsg) {
// impl later
free(pMsg);
}
void taosCloseClient(void* arg) {
// impl later
SClientObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) {
SCliThrdObj* pThrd = cli->pThreadObj[i];
pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->cliAsync);
free(pThrd->loop);
free(pThrd);
}
free(cli->pThreadObj);
free(cli);
}
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later
char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]);
uint32_t port = pEpSet->port[pEpSet->inUse];
SRpcInfo* pRpc = (SRpcInfo*)shandle;
int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
int32_t flen = 0;
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later
}
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
SRpcReqContext* pContext;
pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
pContext->ahandle = pMsg->ahandle;
pContext->pRpc = (SRpcInfo*)shandle;
pContext->epSet = *pEpSet;
pContext->contLen = len;
pContext->pCont = pMsg->pCont;
pContext->msgType = pMsg->msgType;
pContext->oldInUse = pEpSet->inUse;
pCtx->pRpc = (SRpcInfo*)shandle;
pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType;
pCtx->ip = strdup(ip);
pCtx->port = port;
assert(pRpc->connType == TAOS_CONN_CLIENT);
// atomic or not
......@@ -268,14 +345,15 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
if (pRpc->index++ >= pRpc->numOfThreads) {
pRpc->index = 0;
}
SCliMsg* msg = malloc(sizeof(SCliMsg));
msg->context = pContext;
msg->st = taosGetTimestampUs();
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
cliMsg->ctx = pCtx;
cliMsg->msg = *pMsg;
cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
pthread_mutex_lock(&thrd->msgMtx);
QUEUE_PUSH(&thrd->msg, &msg->q);
QUEUE_PUSH(&thrd->msg, &cliMsg->q);
pthread_mutex_unlock(&thrd->msgMtx);
uv_async_send(thrd->cliAsync);
......
......@@ -30,6 +30,20 @@ int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
return ret;
}
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
int ret = -1;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
return ret;
}
void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
......@@ -41,6 +55,17 @@ void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
memcpy(pAuth, context.digest, sizeof(context.digest));
}
void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
memcpy(pAuth, context.digest, sizeof(context.digest));
}
int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHead* pHead = rpcHeadFromCont(pCont);
......@@ -81,6 +106,54 @@ int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
return finalLen;
}
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
return false;
// SRpcHead* pHead = rpcHeadFromCont(pCont);
bool succ = false;
int overhead = sizeof(STransCompMsg);
if (!NEEDTO_COMPRESSS_MSG(len)) {
return succ;
}
char* buf = malloc(len + overhead + 8); // 8 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
*flen = len;
return succ;
}
int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead);
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if (clen > 0 && clen < len - overhead) {
STransCompMsg* pComp = (STransCompMsg*)msg;
pComp->reserved = 0;
pComp->contLen = htonl(len);
memcpy(msg + overhead, buf, clen);
tDebug("compress rpc msg, before:%d, after:%d", len, clen);
*flen = clen + overhead;
succ = true;
} else {
*flen = len;
succ = false;
}
free(buf);
return succ;
}
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
// impl later
return false;
STransCompMsg* pComp = (STransCompMsg*)msg;
int overhead = sizeof(STransCompMsg);
int clen = 0;
return false;
}
SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
int overhead = sizeof(SRpcComp);
SRpcHead* pNewHead = NULL;
......@@ -114,4 +187,8 @@ SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
return pHead;
}
void transConnCtxDestroy(STransConnCtx* ctx) {
free(ctx->ip);
free(ctx);
}
#endif
......@@ -16,13 +16,6 @@
#ifdef USE_UV
#include "transComm.h"
typedef struct SConnBuffer {
char* buf;
int len;
int cap;
int left;
} SConnBuffer;
typedef struct SConn {
uv_tcp_t* pTcp;
uv_write_t* pWriter;
......@@ -100,31 +93,32 @@ static void* acceptThread(void* arg);
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
/*
* formate of data buffer:
* |<-------SRpcReqContext------->|<------------data read from socket----------->|
* |<--------------------------data from socket------------------------------->|
* |<------STransMsgHead------->|<-------------------other data--------------->|
*/
static const int CAPACITY = 1024;
SConn* conn = handle->data;
SConnBuffer* pBuf = &conn->connBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char));
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf + RPC_RESERVE_SIZE;
buf->base = pBuf->buf;
buf->len = CAPACITY;
} else {
if (pBuf->len >= pBuf->cap) {
if (pBuf->left == -1) {
pBuf->cap *= 2;
pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE);
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE);
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
}
}
buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE;
buf->base = pBuf->buf + pBuf->len;
buf->len = pBuf->cap - pBuf->len;
}
}
......@@ -133,11 +127,11 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
//
static bool readComplete(SConnBuffer* data) {
// TODO(yihao): handle pipeline later
SRpcHead rpcHead;
int32_t headLen = sizeof(rpcHead);
STransMsgHead head;
int32_t headLen = sizeof(head);
if (data->len >= headLen) {
memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen);
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
memcpy((char*)&head, data->buf, headLen);
int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
if (msgLen > data->len) {
data->left = msgLen - data->len;
return false;
......@@ -150,21 +144,21 @@ static bool readComplete(SConnBuffer* data) {
}
static void uvDoProcess(SRecvInfo* pRecv) {
SRpcHead* pHead = (SRpcHead*)pRecv->msg;
SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
SConn* pConn = pRecv->thandle;
// impl later
STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
SConn* pConn = pRecv->thandle;
tDump(pRecv->msg, pRecv->msgLen);
terrno = 0;
SRpcReqContext* pContest;
// SRpcReqContext* pContest;
// do auth and check
}
static int uvAuthMsg(SConn* pConn, char* msg, int len) {
SRpcHead* pHead = (SRpcHead*)msg;
int code = 0;
STransMsgHead* pHead = (STransMsgHead*)msg;
int code = 0;
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
// secured link, or no authentication
......@@ -224,7 +218,7 @@ static void uvProcessData(SConn* pConn) {
SRecvInfo info;
SRecvInfo* p = &info;
SConnBuffer* pBuf = &pConn->connBuf;
p->msg = pBuf->buf + RPC_RESERVE_SIZE;
p->msg = pBuf->buf;
p->msgLen = pBuf->len;
p->ip = 0;
p->port = 0;
......@@ -233,11 +227,10 @@ static void uvProcessData(SConn* pConn) {
p->chandle = NULL;
//
SRpcHead* pHead = (SRpcHead*)p->msg;
assert(rpcIsReq(pHead->msgType));
STransMsgHead* pHead = (STransMsgHead*)p->msg;
assert(transIsReq(pHead->msgType));
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
pConn->ahandle = (void*)pHead->ahandle;
// auth here
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
......@@ -247,14 +240,19 @@ static void uvProcessData(SConn* pConn) {
}
pHead->code = htonl(pHead->code);
int32_t dlen = 0;
SRpcMsg rpcMsg;
pHead = rpcDecompressRpcMsg(pHead);
if (transDecompressMsg(NULL, 0, NULL)) {
// add compress later
// pHead = rpcDecompressRpcMsg(pHead);
} else {
// impl later
}
rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
rpcMsg.ahandle = NULL;
rpcMsg.handle = pConn;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
......@@ -265,13 +263,13 @@ static void uvProcessData(SConn* pConn) {
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
SConn* ctx = cli->data;
SConnBuffer* pBuf = &ctx->connBuf;
SConn* conn = cli->data;
SConnBuffer* pBuf = &conn->connBuf;
if (nread > 0) {
pBuf->len += nread;
if (readComplete(pBuf)) {
tDebug("alread read complete packet");
uvProcessData(ctx);
uvProcessData(conn);
} else {
tDebug("read half packet, continue to read");
}
......@@ -423,7 +421,7 @@ void* workerThread(void* arg) {
uv_loop_init(pThrd->loop);
// SRpcInfo* pRpc = pThrd->shandle;
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_init(pThrd->loop, pThrd->pipe, 0);
uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd->pipe->data = pThrd;
......@@ -491,6 +489,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
......@@ -522,6 +521,22 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
return srv;
}
void taosCloseServer(void* arg) {
// impl later
SServerObj* srv = arg;
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrdObj* pThrd = srv->pThreadObj[i];
pthread_join(pThrd->thread, NULL);
free(srv->pipe[i]);
free(pThrd->loop);
free(pThrd);
}
free(srv->loop);
free(srv->pipe);
free(srv->pThreadObj);
pthread_join(srv->thread, NULL);
free(srv);
}
void rpcSendResponse(const SRpcMsg* pMsg) {
SConn* pConn = pMsg->handle;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tutil.h"
#include "tglobal.h"
#include "rpcLog.h"
#include "trpc.h"
#include "taoserror.h"
typedef struct {
int index;
SRpcEpSet epSet;
int num;
int numOfReqs;
int msgSize;
tsem_t rspSem;
tsem_t *pOverSem;
pthread_t thread;
void *pRpc;
} SInfo;
static int tcount = 0;
static int terror = 0;
static void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param;
SRpcMsg rpcMsg, rspMsg;
tDebug("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.handle = pInfo;
rpcMsg.msgType = 1;
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &rspMsg);
// handle response
if (rspMsg.code != 0) terror++;
tDebug("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code);
rpcFreeCont(rspMsg.pCont);
if ( pInfo->num % 20000 == 0 )
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
}
tDebug("thread:%d, it is over", pInfo->index);
tcount++;
return NULL;
}
int main(int argc, char *argv[]) {
SRpcInit rpcInit;
SRpcEpSet epSet;
int msgSize = 128;
int numOfReqs = 0;
int appThreads = 1;
char serverIp[40] = "127.0.0.1";
char secret[TSDB_KEY_LEN] = "mypassword";
struct timeval systemTime;
int64_t startTime, endTime;
pthread_attr_t thattr;
// server info
epSet.numOfEps = 1;
epSet.inUse = 0;
epSet.port[0] = 7000;
epSet.port[1] = 7000;
strcpy(epSet.fqdn[0], serverIp);
strcpy(epSet.fqdn[1], "192.168.0.1");
// client info
memset(&rpcInit, 0, sizeof(rpcInit));
//rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 0;
rpcInit.label = "APP";
rpcInit.numOfThreads = 1;
rpcInit.sessions = 100;
rpcInit.idleTime = tsShellActivityTimer*1000;
rpcInit.user = "michael";
rpcInit.secret = secret;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
epSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
tstrncpy(epSet.fqdn[0], argv[++i], sizeof(epSet.fqdn[0]));
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
msgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
rpcInit.sessions = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
numOfReqs = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
appThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
tsCompressMsgSize = atoi(argv[++i]);
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
rpcInit.user = argv[++i];
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
rpcInit.secret = argv[++i];
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
rpcInit.spi = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", epSet.port[0]);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("client.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
tError("failed to initialize RPC");
return -1;
}
tInfo("client is initialized");
gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
for (int i=0; i<appThreads; ++i) {
pInfo->index = i;
pInfo->epSet = epSet;
pInfo->numOfReqs = numOfReqs;
pInfo->msgSize = msgSize;
tsem_init(&pInfo->rspSem, 0, 0);
pInfo->pRpc = pRpc;
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
pInfo++;
}
do {
usleep(1);
} while ( tcount < appThreads);
gettimeofday(&systemTime, NULL);
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime)/1000.0; // mseconds
tInfo("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror);
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
taosCloseLog();
return 0;
}
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "task.h"
#include <uv.h>
#define NUM_OF_THREAD 1
#define TIMEOUT 10000
typedef struct SThreadObj {
pthread_t thread;
uv_pipe_t *pipe;
uv_loop_t *loop;
uv_async_t *workerAsync; //
int fd;
} SThreadObj;
typedef struct SServerObj {
uv_tcp_t server;
uv_loop_t *loop;
int workerIdx;
int numOfThread;
SThreadObj **pThreadObj;
uv_pipe_t **pipe;
} SServerObj;
typedef struct SConnCtx {
uv_tcp_t *pClient;
uv_timer_t *pTimer;
uv_async_t *pWorkerAsync;
int ref;
} SConnCtx;
void echo_write(uv_write_t *req, int status) {
if (status < 0) {
fprintf(stderr, "Write error %s\n", uv_err_name(status));
}
printf("write data to client\n");
free(req);
}
void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
SConnCtx *pConn = container_of(client, SConnCtx, pClient);
pConn->ref += 1;
printf("read data %d\n", nread, buf->base, buf->len);
if (nread > 0) {
uv_write_t *req = (uv_write_t *)malloc(sizeof(uv_write_t));
// dispatch request to database other process thread
// just write out
uv_buf_t write_out;
write_out.base = buf->base;
write_out.len = nread;
uv_write((uv_write_t *)req, client, &write_out, 1, echo_write);
free(buf->base);
return;
}
if (nread < 0) {
if (nread != UV_EOF)
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t *)client, NULL);
}
free(buf->base);
}
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = malloc(suggested_size);
buf->len = suggested_size;
}
void on_new_connection(uv_stream_t *s, int status) {
if (status == -1) {
// error!
return;
}
SServerObj *pObj = container_of(s, SServerObj, server);
printf("new_connection from client\n");
uv_tcp_t *client = (uv_tcp_t *)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, client);
if (uv_accept(s, (uv_stream_t *)client) == 0) {
uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t));
uv_buf_t dummy_buf = uv_buf_init("a", 1);
// despatch to worker thread
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
uv_write2(write_req, (uv_stream_t *)&(pObj->pipe[pObj->workerIdx][0]),
&dummy_buf, 1, (uv_stream_t *)client, echo_write);
} else {
uv_close((uv_handle_t *)client, NULL);
}
}
void child_on_new_connection(uv_stream_t *q, ssize_t nread,
const uv_buf_t *buf) {
printf("x child_on_new_connection \n");
if (nread < 0) {
if (nread != UV_EOF)
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t *)q, NULL);
return;
}
SThreadObj *pObj = (SThreadObj *)container_of(q, struct SThreadObj, pipe);
uv_pipe_t *pipe = (uv_pipe_t *)q;
if (!uv_pipe_pending_count(pipe)) {
fprintf(stderr, "No pending count\n");
return;
}
uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP);
SConnCtx *pConn = malloc(sizeof(SConnCtx));
/* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pObj->loop, pConn->pTimer);
pConn->pClient = (uv_tcp_t *)malloc(sizeof(uv_tcp_t));
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
uv_tcp_init(pObj->loop, pConn->pClient);
if (uv_accept(q, (uv_stream_t *)(pConn->pClient)) == 0) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t *)pConn->pClient, &fd);
fprintf(stderr, "Worker Accepted fd %d\n", fd);
uv_timer_start(pConn->pTimer, timeOutCallBack, TIMEOUT, 0);
uv_read_start((uv_stream_t *)(pConn->pClient), alloc_buffer, echo_read);
} else {
uv_timer_stop(pConn->pTimer);
free(pConn->pTimer);
uv_close((uv_handle_t *)pConn->pClient, NULL);
free(pConn->pClient);
free(pConn);
}
}
static void workerAsyncCallback(uv_async_t *handle) {
SThreadObj *pObj = container_of(handle, SThreadObj, workerAsync);
// do nothing
}
void *worker_thread(void *arg) {
SThreadObj *pObj = (SThreadObj *)arg;
int fd = pObj->fd;
pObj->loop = (uv_loop_t *)malloc(sizeof(uv_loop_t));
uv_loop_init(pObj->loop);
uv_pipe_init(pObj->loop, pObj->pipe, 1);
uv_pipe_open(pObj->pipe, fd);
pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCallback);
uv_read_start((uv_stream_t *)pObj->pipe, alloc_buffer,
child_on_new_connection);
uv_run(pObj->loop, UV_RUN_DEFAULT);
}
int main() {
SServerObj *server = calloc(1, sizeof(SServerObj));
server->loop = (uv_loop_t *)malloc(sizeof(uv_loop_t));
server->numOfThread = NUM_OF_THREAD;
server->workerIdx = 0;
server->pThreadObj =
(SThreadObj **)calloc(server->numOfThread, sizeof(SThreadObj *));
server->pipe = (uv_pipe_t **)calloc(server->numOfThread, sizeof(uv_pipe_t *));
uv_loop_init(server->loop);
for (int i = 0; i < server->numOfThread; i++) {
server->pThreadObj[i] = (SThreadObj *)calloc(1, sizeof(SThreadObj));
server->pipe[i] = (uv_pipe_t *)calloc(2, sizeof(uv_pipe_t));
int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE,
UV_NONBLOCK_PIPE) != 0) {
return -1;
}
uv_pipe_init(server->loop, &(server->pipe[i][0]), 1);
uv_pipe_open(&(server->pipe[i][0]), fds[1]); // init write
server->pThreadObj[i]->fd = fds[0];
server->pThreadObj[i]->pipe = &(server->pipe[i][1]); // init read
int err = pthread_create(&(server->pThreadObj[i]->thread), NULL,
worker_thread, (void *)(server->pThreadObj[i]));
if (err == 0) {
printf("thread %d create\n", i);
} else {
printf("thread %d create failed", i);
}
uv_tcp_init(server->loop, &server->server);
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", 7000, &bind_addr);
uv_tcp_bind(&server->server, (const struct sockaddr *)&bind_addr, 0);
int err = 0;
if ((err = uv_listen((uv_stream_t *)&server->server, 128,
on_new_connection)) != 0) {
fprintf(stderr, "Listen error %s\n", uv_err_name(err));
return 2;
}
uv_run(server->loop, UV_RUN_DEFAULT);
return 0;
}
}
......@@ -26,6 +26,7 @@ char dbName[32] = "db";
char stbName[64] = "st";
int32_t numOfThreads = 1;
int64_t numOfTables = 200000;
int64_t startOffset = 0;
int32_t createTable = 1;
int32_t insertData = 0;
int32_t batchNumOfTbl = 100;
......@@ -84,7 +85,7 @@ void createDbAndStb() {
}
taos_free_result(pRes);
sprintf(qstr, "create table %s (ts timestamp, i int) tags (j bigint)", stbName);
sprintf(qstr, "create table if not exists %s (ts timestamp, i int) tags (j int)", stbName);
pRes = taos_query(con, qstr);
code = taos_errno(pRes);
if (code != 0) {
......@@ -181,8 +182,19 @@ void *threadFunc(void *param) {
exit(1);
}
// printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex,
// pInfo->tableEndIndex);
pError("====before thread:%d, table range: %"PRId64 " - %"PRId64 "\n",
pInfo->threadIndex,
pInfo->tableBeginIndex,
pInfo->tableEndIndex);
pInfo->tableBeginIndex += startOffset;
pInfo->tableEndIndex += startOffset;
pError("====after thread:%d, table range: %"PRId64 " - %"PRId64 "\n",
pInfo->threadIndex,
pInfo->tableBeginIndex,
pInfo->tableEndIndex);
sprintf(qstr, "use %s", pInfo->dbName);
TAOS_RES *pRes = taos_query(con, qstr);
taos_free_result(pRes);
......@@ -210,7 +222,7 @@ void *threadFunc(void *param) {
TAOS_RES *pRes = taos_query(con, qstr);
code = taos_errno(pRes);
if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code));
pError("failed to create table reason:%s, sql: %s", tstrerror(code), qstr);
}
taos_free_result(pRes);
int64_t endTs = taosGetTimestampUs();
......@@ -296,6 +308,8 @@ void printHelp() {
printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads);
printf("%s%s\n", indent, "-n");
printf("%s%s%s%" PRId64 "\n", indent, indent, "numOfTables, default is ", numOfTables);
printf("%s%s\n", indent, "-g");
printf("%s%s%s%" PRId64 "\n", indent, indent, "startOffset, default is ", startOffset);
printf("%s%s\n", indent, "-v");
printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", numOfVgroups);
printf("%s%s\n", indent, "-a");
......@@ -329,6 +343,8 @@ void parseArgument(int32_t argc, char *argv[]) {
numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) {
numOfTables = atoll(argv[++i]);
} else if (strcmp(argv[i], "-g") == 0) {
startOffset = atoll(argv[++i]);
} else if (strcmp(argv[i], "-v") == 0) {
numOfVgroups = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a") == 0) {
......@@ -352,6 +368,7 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s stbName:%s %s", GREEN, stbName, NC);
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s numOfTables:%" PRId64 " %s", GREEN, numOfTables, NC);
pPrint("%s startOffset:%" PRId64 " %s", GREEN, startOffset, NC);
pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC);
pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC);
pPrint("%s createTable:%d %s", GREEN, createTable, NC);
......@@ -381,7 +398,7 @@ int32_t main(int32_t argc, char *argv[]) {
createDbAndStb();
}
pPrint("%d threads are spawned to create %" PRId64 " tables", numOfThreads, numOfTables);
pPrint("%d threads are spawned to create %" PRId64 " tables, offset is %" PRId64 " ", numOfThreads, numOfTables, startOffset);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
......@@ -407,7 +424,7 @@ int32_t main(int32_t argc, char *argv[]) {
int64_t tableFrom = 0;
for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].tableBeginIndex = tableFrom;
pInfo[i].tableEndIndex = i < b ? tableFrom + a : tableFrom + a - 1;
pInfo[i].tableEndIndex = (i < b ? tableFrom + a : tableFrom + a - 1);
tableFrom = pInfo[i].tableEndIndex + 1;
pInfo[i].threadIndex = i;
pInfo[i].minDelay = INT64_MAX;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册