提交 962ed386 编写于 作者: dengyihao's avatar dengyihao

stmt reqid incr one

上级 1102a88a
...@@ -26,21 +26,40 @@ TdFilePtr pDataFile = NULL; ...@@ -26,21 +26,40 @@ TdFilePtr pDataFile = NULL;
STaosQueue *qhandle = NULL; STaosQueue *qhandle = NULL;
STaosQset *qset = NULL; STaosQset *qset = NULL;
int32_t balance = 0;
typedef struct {
int32_t numOfThread;
STaosQueue **qhandle;
STaosQset **qset;
} MultiThreadQhandle;
typedef struct TThread {
TdThread thread;
int idx;
} TThread;
MultiThreadQhandle *multiQ = NULL;
void initLogEnv() { void initLogEnv() {
const char *logDir = "/tmp/trans_svr"; const char *logDir = "/tmp/trans_svr";
const char* defaultLogFileNamePrefix = "taoslog"; const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10000; const int32_t maxLogFileNum = 10000;
tsAsyncLog = 0; tsAsyncLog = 0;
//idxDebugFlag = 143; // idxDebugFlag = 143;
strcpy(tsLogDir, logDir); strcpy(tsLogDir, logDir);
taosRemoveDir(tsLogDir); taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir); taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir); printf("failed to open log file in directory:%s\n", tsLogDir);
} }
} }
void processShellMsg() { void *processShellMsg(void *arg) {
TThread *thread = (TThread *)arg;
int32_t idx = thread->idx;
static int num = 0; static int num = 0;
STaosQall *qall; STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
...@@ -50,7 +69,7 @@ void processShellMsg() { ...@@ -50,7 +69,7 @@ void processShellMsg() {
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &qinfo); int numOfMsgs = taosReadAllQitemsFromQset(multiQ->qset[idx], qall, &qinfo);
tDebug("%d shell msgs are received", numOfMsgs); tDebug("%d shell msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break; if (numOfMsgs <= 0) break;
...@@ -103,6 +122,7 @@ void processShellMsg() { ...@@ -103,6 +122,7 @@ void processShellMsg() {
} }
taosFreeQall(qall); taosFreeQall(qall);
return NULL;
} }
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
...@@ -111,8 +131,11 @@ void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -111,8 +131,11 @@ void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
int32_t idx = balance % multiQ->numOfThread;
tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(qhandle, pTemp); taosWriteQitem(multiQ->qhandle[idx], pTemp);
balance++;
if (balance >= multiQ->numOfThread) balance = 0;
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
...@@ -162,7 +185,7 @@ int main(int argc, char *argv[]) { ...@@ -162,7 +185,7 @@ int main(int argc, char *argv[]) {
} }
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
initLogEnv(); initLogEnv();
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
...@@ -178,16 +201,35 @@ int main(int argc, char *argv[]) { ...@@ -178,16 +201,35 @@ int main(int argc, char *argv[]) {
pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CREATE | TD_FILE_WRITE); pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CREATE | TD_FILE_WRITE);
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno)); if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
} }
qhandle = taosOpenQueue();
qset = taosOpenQset();
taosAddIntoQset(qset, qhandle, NULL);
processShellMsg(); int32_t numOfAthread = 5;
multiQ = taosMemoryMalloc(sizeof(numOfAthread));
multiQ->numOfThread = numOfAthread;
multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread);
multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread);
for (int i = 0; i < numOfAthread; i++) {
multiQ->qhandle[i] = taosOpenQueue();
multiQ->qset[i] = taosOpenQset();
taosAddIntoQset(multiQ->qset[i], multiQ->qhandle[i], NULL);
}
TThread *threads = taosMemoryMalloc(sizeof(TThread) * numOfAthread);
for (int i = 0; i < numOfAthread; i++) {
threads[i].idx = i;
taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]);
}
// qhandle = taosOpenQueue();
// qset = taosOpenQset();
// taosAddIntoQset(qset, qhandle, NULL);
// processShellMsg();
if (pDataFile != NULL) { if (pDataFile != NULL) {
taosCloseFile(&pDataFile); taosCloseFile(&pDataFile);
taosRemoveFile(dataName); taosRemoveFile(dataName);
} }
int ch = getchar();
UNUSED(ch);
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册