提交 6dc19fcf 编写于 作者: H hjxilinx

Merge branch 'feature/query' into feature/mpeer

...@@ -45,9 +45,10 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -45,9 +45,10 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
pSql->signature = pSql; pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_VNODES_SUPPORT;
pSql->fp = fp; pSql->fp = fp;
pSql->param = param;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("failed to malloc payload"); tscError("failed to malloc payload");
......
...@@ -270,7 +270,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -270,7 +270,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
} else { } else {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; // todo move away
pSql->res.code = rpcMsg->code; // keep the previous error code pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) { if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
...@@ -327,7 +326,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -327,7 +326,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
* There is not response callback function for submit response. * There is not response callback function for submit response.
* The actual inserted number of points is the first number. * The actual inserted number of points is the first number.
*/ */
if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) { if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp; SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
pMsg->code = htonl(pMsg->code); pMsg->code = htonl(pMsg->code);
pMsg->numOfRows = htonl(pMsg->numOfRows); pMsg->numOfRows = htonl(pMsg->numOfRows);
......
...@@ -129,6 +129,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -129,6 +129,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->signature = pSql; pSql->signature = pSql;
pSql->maxRetry = TSDB_VNODES_SUPPORT;
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
pObj->pSql = pSql; pObj->pSql = pSql;
......
...@@ -171,17 +171,49 @@ void taos_init_imp() { ...@@ -171,17 +171,49 @@ void taos_init_imp() {
if(0 == tscEmbedded){ if(0 == tscEmbedded){
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
int64_t refreshTime = tsMetricMetaKeepTimer < tsMeterMetaKeepTimer ? tsMetricMetaKeepTimer : tsMeterMetaKeepTimer; int64_t refreshTime = tsMetricMetaKeepTimer < tsMeterMetaKeepTimer ? tsMetricMetaKeepTimer : tsMeterMetaKeepTimer;
refreshTime = refreshTime > 2 ? 2 : refreshTime; refreshTime = refreshTime > 2 ? 2 : refreshTime;
refreshTime = refreshTime < 1 ? 1 : refreshTime; refreshTime = refreshTime < 1 ? 1 : refreshTime;
if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime); if (tscCacheHandle == NULL) {
tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
}
tscTrace("client is initialized successfully"); tscTrace("client is initialized successfully");
} }
void taos_init() { pthread_once(&tscinit, taos_init_imp); } void taos_init() { pthread_once(&tscinit, taos_init_imp); }
void taos_cleanup() {
if (tscCacheHandle != NULL) {
taosCacheCleanup(tscCacheHandle);
}
if (tscQhandle != NULL) {
taosCleanUpScheduler(tscQhandle);
tscQhandle = NULL;
}
taosCloseLogger();
if (pVnodeConn != NULL) {
rpcClose(pVnodeConn);
pVnodeConn = NULL;
}
if (pTscMgmtConn != NULL) {
rpcClose(pTscMgmtConn);
pTscMgmtConn = NULL;
}
if (tsGlobalConfig != NULL) {
tfree(tsGlobalConfig);
}
taosTmrCleanUp(tscTmr);
}
static int taos_options_imp(TSDB_OPTION option, const char *pStr) { static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
SGlobalConfig *cfg = NULL; SGlobalConfig *cfg = NULL;
......
...@@ -52,6 +52,7 @@ typedef struct taosField { ...@@ -52,6 +52,7 @@ typedef struct taosField {
#endif #endif
DLL_EXPORT void taos_init(); DLL_EXPORT void taos_init();
DLL_EXPORT void taos_cleanup();
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos); DLL_EXPORT void taos_close(TAOS *taos);
......
...@@ -71,7 +71,7 @@ extern void* shellLoopQuery(void* arg); ...@@ -71,7 +71,7 @@ extern void* shellLoopQuery(void* arg);
extern void taos_error(TAOS* con); extern void taos_error(TAOS* con);
extern int regex_match(const char* s, const char* reg, int cflags); extern int regex_match(const char* s, const char* reg, int cflags);
void shellReadCommand(TAOS* con, char command[]); void shellReadCommand(TAOS* con, char command[]);
void shellRunCommand(TAOS* con, char* command); int32_t shellRunCommand(TAOS* con, char* command);
void shellRunCommandOnServer(TAOS* con, char command[]); void shellRunCommandOnServer(TAOS* con, char command[]);
void read_history(); void read_history();
void write_history(); void write_history();
......
...@@ -166,10 +166,10 @@ void shellReplaceCtrlChar(char *str) { ...@@ -166,10 +166,10 @@ void shellReplaceCtrlChar(char *str) {
*pstr = '\0'; *pstr = '\0';
} }
void shellRunCommand(TAOS *con, char *command) { int32_t shellRunCommand(TAOS *con, char *command) {
/* If command is empty just return */ /* If command is empty just return */
if (regex_match(command, "^[ \t;]*$", REG_EXTENDED)) { if (regex_match(command, "^[ \t;]*$", REG_EXTENDED)) {
return; return 0;
} }
/* Update the history vector. */ /* Update the history vector. */
...@@ -193,11 +193,11 @@ void shellRunCommand(TAOS *con, char *command) { ...@@ -193,11 +193,11 @@ void shellRunCommand(TAOS *con, char *command) {
if (regex_match(command, "^[ \t]*(quit|q|exit)[ \t;]*$", REG_EXTENDED | REG_ICASE)) { if (regex_match(command, "^[ \t]*(quit|q|exit)[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
taos_close(con); taos_close(con);
write_history(); write_history();
exitShell(); return -1;
} else if (regex_match(command, "^[\t ]*clear[ \t;]*$", REG_EXTENDED | REG_ICASE)) { } else if (regex_match(command, "^[\t ]*clear[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
// If clear the screen. // If clear the screen.
system("clear"); system("clear");
return; return 0;
} else if (regex_match(command, "^[ \t]*source[\t ]+[^ ]+[ \t;]*$", REG_EXTENDED | REG_ICASE)) { } else if (regex_match(command, "^[ \t]*source[\t ]+[^ ]+[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
/* If source file. */ /* If source file. */
char *c_ptr = strtok(command, " ;"); char *c_ptr = strtok(command, " ;");
...@@ -209,6 +209,8 @@ void shellRunCommand(TAOS *con, char *command) { ...@@ -209,6 +209,8 @@ void shellRunCommand(TAOS *con, char *command) {
} else { } else {
shellRunCommandOnServer(con, command); shellRunCommandOnServer(con, command);
} }
return 0;
} }
void shellRunCommandOnServer(TAOS *con, char command[]) { void shellRunCommandOnServer(TAOS *con, char command[]) {
......
...@@ -295,6 +295,7 @@ void *shellLoopQuery(void *arg) { ...@@ -295,6 +295,7 @@ void *shellLoopQuery(void *arg) {
tscError("failed to malloc command"); tscError("failed to malloc command");
return NULL; return NULL;
} }
while (1) { while (1) {
// Read command from shell. // Read command from shell.
...@@ -304,9 +305,14 @@ void *shellLoopQuery(void *arg) { ...@@ -304,9 +305,14 @@ void *shellLoopQuery(void *arg) {
reset_terminal_mode(); reset_terminal_mode();
// Run the command // Run the command
shellRunCommand(con, command); if (shellRunCommand(con, command) != 0) {
break;
}
} }
tfree(command);
exitShell();
pthread_cleanup_pop(1); pthread_cleanup_pop(1);
return NULL; return NULL;
...@@ -487,6 +493,7 @@ void showOnScreen(Command *cmd) { ...@@ -487,6 +493,7 @@ void showOnScreen(Command *cmd) {
void cleanup_handler(void *arg) { tcsetattr(0, TCSANOW, &oldtio); } void cleanup_handler(void *arg) { tcsetattr(0, TCSANOW, &oldtio); }
void exitShell() { void exitShell() {
tcsetattr(0, TCSANOW, &oldtio); /*int32_t ret =*/ tcsetattr(STDIN_FILENO, TCSANOW, &oldtio);
taos_cleanup();
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
...@@ -95,7 +95,7 @@ int main(int argc, char* argv[]) { ...@@ -95,7 +95,7 @@ int main(int argc, char* argv[]) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
/* Interupt handler. */ /* Interrupt handler. */
struct sigaction act; struct sigaction act;
memset(&act, 0, sizeof(struct sigaction)); memset(&act, 0, sizeof(struct sigaction));
......
...@@ -101,8 +101,6 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) { ...@@ -101,8 +101,6 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) {
if (pNode->next) { if (pNode->next) {
(pNode->next)->prev = pNode; (pNode->next)->prev = pNode;
} }
pTrace("key:%s %p update hash table", pNode->key, pNode);
} }
/** /**
...@@ -153,18 +151,18 @@ static void taosHashTableResize(SHashObj *pHashObj) { ...@@ -153,18 +151,18 @@ static void taosHashTableResize(SHashObj *pHashObj) {
SHashNode *pNode = NULL; SHashNode *pNode = NULL;
SHashNode *pNext = NULL; SHashNode *pNext = NULL;
int32_t newSize = pHashObj->capacity << 1U; int32_t newSize = pHashObj->capacity << 1u;
if (newSize > HASH_MAX_CAPACITY) { if (newSize > HASH_MAX_CAPACITY) {
pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", // pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
pHashObj->capacity, HASH_MAX_CAPACITY); // pHashObj->capacity, HASH_MAX_CAPACITY);
return; return;
} }
int64_t st = taosGetTimestampUs(); // int64_t st = taosGetTimestampUs();
SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize); SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize);
if (pNewEntry == NULL) { if (pNewEntry == NULL) {
pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); // pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return; return;
} }
...@@ -230,10 +228,9 @@ static void taosHashTableResize(SHashObj *pHashObj) { ...@@ -230,10 +228,9 @@ static void taosHashTableResize(SHashObj *pHashObj) {
} }
} }
int64_t et = taosGetTimestampUs(); // int64_t et = taosGetTimestampUs();
// pTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity,
pTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity, // ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
} }
/** /**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册