提交 ef242d0a 编写于 作者: A Alex Duan

feat(shell): autotab push to 2.6 branch

上级 9623e620
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __SHELL_AUTO__
#define __SHELL_AUTO__
#define TAB_KEY 0x09
// press tab key
void pressTabKey(TAOS * con, Command * cmd);
// press othr key
void pressOtherKey(char c);
// init shell auto funciton , shell start call once
bool shellAutoInit();
// exit shell auto funciton, shell exit call once
void shellAutoExit();
// callback autotab module
void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb);
#endif
......@@ -41,6 +41,7 @@ extern void deleteChar(Command *cmd);
extern void moveCursorLeft(Command *cmd);
extern void moveCursorRight(Command *cmd);
extern void positionCursorHome(Command *cmd);
extern void positionCursorMiddle(Command *cmd);
extern void positionCursorEnd(Command *cmd);
extern void showOnScreen(Command *cmd);
extern void updateBuffer(Command *cmd);
......@@ -51,5 +52,6 @@ int countPrefixOnes(unsigned char c);
void clearScreen(int ecmd_pos, int cursor_pos);
void printChar(char c, int times);
void positionCursor(int step, int direction);
void getPrevCharSize(const char *str, int pos, int *size, int *width);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __TRIE__
#define __TRIE__
//
// The prefix search tree is a efficient storage words and search words tree, it support 95 visible ascii code character
//
#define FIRST_ASCII 40 // first visiable char is '0'
#define LAST_ASCII 122 // last visilbe char is 'z'
// capacity save char is 95
#define CHAR_CNT (LAST_ASCII - FIRST_ASCII + 1)
#define MAX_WORD_LEN 256 // max insert word length
// define STire
#define TIRE_TREE 0
#define TIRE_LIST 1
typedef struct STireNode {
struct STireNode** d;
bool end; // record end flag
}STireNode;
typedef struct StrName {
char * name;
struct StrName * next;
}StrName;
typedef struct STire {
char type; // see define TIRE_
STireNode root;
StrName * head;
StrName * tail;
int count; // all count
int ref;
}STire;
typedef struct SMatchNode {
char* word;
struct SMatchNode* next;
}SMatchNode;
typedef struct SMatch {
SMatchNode* head;
SMatchNode* tail; // append node to tail
int count;
char pre[MAX_WORD_LEN];
}SMatch;
// ----------- interface -------------
// create prefix search tree, return value call freeTire to free
STire* createTire(char type);
// destroy prefix search tree
void freeTire(STire* tire);
// add a new word
bool insertWord(STire* tire, char* word);
// add a new word
bool deleteWord(STire* tire, char* word);
// match prefix words, if match is not NULL , put all item to match and return match
SMatch* matchPrefix(STire* tire, char* prefix, SMatch* match);
// get all items from tires tree
SMatch* enumAll(STire* tire);
// free match result
void freeMatch(SMatch* match);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define __USE_XOPEN
#include "os.h"
#include "tglobal.h"
#include "shell.h"
#include "shellCommand.h"
#include "tkey.h"
#include "tulog.h"
#include "shellAuto.h"
#include "tire.h"
#include "tthread.h"
//
// ------------- define area ---------------
//
#define UNION_ALL " union all "
// extern function
void insertChar(Command *cmd, char *c, int size);
typedef struct SAutoPtr {
STire* p;
int ref;
}SAutoPtr;
typedef struct SWord{
int type ; // word type , see WT_ define
char * word;
int32_t len;
struct SWord * next;
bool free; // if true need free
}SWord;
typedef struct {
char * source;
int32_t source_len; // valid data length in source
int32_t count;
SWord* head;
// matched information
int32_t matchIndex; // matched word index in words
int32_t matchLen; // matched length at matched word
}SWords;
SWords shellCommands[] = {
{"alter database <db_name> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword>", 0, 0, NULL},
{"alter dnode <dnode_id> balance ", 0, 0, NULL},
{"alter dnode <dnode_id> resetlog;", 0, 0, NULL},
{"alter dnode <dnode_id> debugFlag 141;", 0, 0, NULL},
{"alter dnode <dnode_id> monitor 1;", 0, 0, NULL},
{"alter table <tb_name> <tb_actions>", 0, 0, NULL},
{"alter table modify column", 0, 0, NULL},
{"alter topic", 0, 0, NULL},
{"alter user <user_name> pass", 0, 0, NULL},
{"alter user <user_name> privilege read", 0, 0, NULL},
{"alter user <user_name> privilege write", 0, 0, NULL},
{"create table <anyword> using <stb_name> tags(", 0, 0, NULL},
{"create database ", 0, 0, NULL},
{"create table <anyword> as ", 0, 0, NULL},
{"create dnode ", 0, 0, NULL},
{"create topic", 0, 0, NULL},
{"create function ", 0, 0, NULL},
{"create user <anyword> pass", 0, 0, NULL},
{"compact vnode in", 0, 0, NULL},
{"describe <all_table>", 0, 0, NULL},
#ifdef TD_ENTERPRISE
{"delete from <all_table> where", 0, 0, NULL},
#endif
{"drop database <db_name>", 0, 0, NULL},
{"drop dnode <dnode_id>", 0, 0, NULL},
{"drop function", 0, 0, NULL},
{"drop topic", 0, 0, NULL},
{"drop table <all_table>;", 0, 0, NULL},
{"drop user <user_name>;", 0, 0, NULL},
{"kill connection", 0, 0, NULL},
{"kill query", 0, 0, NULL},
{"kill stream", 0, 0, NULL},
{"select * from <all_table> where ", 0, 0, NULL},
{"select _block_dist() from <all_table> \\G;", 0, 0, NULL},
{"select client_version();", 0, 0, NULL},
{"select current_user();", 0, 0, NULL},
{"select database;", 0, 0, NULL},
{"select server_version();", 0, 0, NULL},
{"set max_binary_display_width ", 0, 0, NULL},
{"show create database <db_name> \\G;", 0, 0, NULL},
{"show create stable <stb_name> \\G;", 0, 0, NULL},
{"show create table <tb_name> \\G;", 0, 0, NULL},
{"show connections;", 0, 0, NULL},
{"show databases;", 0, 0, NULL},
{"show dnodes;", 0, 0, NULL},
{"show functions;", 0, 0, NULL},
{"show modules;", 0, 0, NULL},
{"show mnodes;", 0, 0, NULL},
{"show queries;", 0, 0, NULL},
{"show stables;", 0, 0, NULL},
{"show stables like ", 0, 0, NULL},
{"show streams;", 0, 0, NULL},
{"show scores;", 0, 0, NULL},
{"show tables;", 0, 0, NULL},
{"show tables like", 0, 0, NULL},
{"show users;", 0, 0, NULL},
{"show variables;", 0, 0, NULL},
{"show vgroups;", 0, 0, NULL},
{"insert into <tb_name> values(", 0, 0, NULL},
{"use <db_name>", 0, 0, NULL}
};
char * keywords[] = {
"and ",
"asc ",
"desc ",
"from ",
"fill(",
"limit ",
"where ",
"interval(",
"order by ",
"order by ",
"offset ",
"or ",
"group by ",
"now()",
"session(",
"sliding ",
"slimit ",
"soffset ",
"state_window(",
"today() ",
"union all select ",
};
char * functions[] = {
"count(",
"sum(",
"avg(",
"last(",
"last_row(",
"top(",
"interp(",
"max(",
"min(",
"now()",
"today()",
"percentile(",
"tail(",
"pow(",
"abs(",
"atan(",
"acos(",
"asin(",
"apercentile(",
"bottom(",
"cast(",
"ceil(",
"char_length(",
"cos(",
"concat(",
"concat_ws(",
"csum(",
"diff(",
"derivative(",
"elapsed(",
"first(",
"floor(",
"hyperloglog(",
"histogram(",
"irate(",
"leastsquares(",
"length(",
"log(",
"lower(",
"ltrim(",
"mavg(",
"mode(",
"tan(",
"round(",
"rtrim(",
"sample(",
"sin(",
"spread(",
"substr(",
"statecount(",
"stateduration(",
"stddev(",
"sqrt(",
"timediff(",
"timezone(",
"timetruncate(",
"twa(",
"to_unixtimestamp(",
"unique(",
"upper(",
};
char * tb_actions[] = {
"add column",
"modify column",
"drop column",
"change tag",
};
char * db_options[] = {
"blocks",
"cachelast",
"comp",
"keep",
"replica",
"quorum",
};
char * data_types[] = {
"timestamp",
"int",
"bigint",
"float",
"double",
"binary",
"smallint",
"tinyint",
"bool",
"nchar",
"json"
};
char * key_tags[] = {
"tags("
};
//
// ------- gobal variant define ---------
//
int32_t firstMatchIndex = -1; // first match shellCommands index
int32_t lastMatchIndex = -1; // last match shellCommands index
int32_t curMatchIndex = -1; // current match shellCommands index
int32_t lastWordBytes = -1; // printShow last word length
bool waitAutoFill = false;
//
// ----------- global var array define -----------
//
#define WT_VAR_DBNAME 0
#define WT_VAR_STABLE 1
#define WT_VAR_TABLE 2
#define WT_VAR_DNODEID 3
#define WT_VAR_USERNAME 4
#define WT_VAR_ALLTABLE 5
#define WT_VAR_FUNC 6
#define WT_VAR_KEYWORD 7
#define WT_VAR_TBACTION 8
#define WT_VAR_DBOPTION 9
#define WT_VAR_DATATYPE 10
#define WT_VAR_KEYTAGS 11
#define WT_VAR_ANYWORD 12
#define WT_VAR_CNT 13
#define WT_FROM_DB_MAX 4 // max get content from db
#define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1)
#define WT_TEXT 0xFF
char dbName[256] = ""; // save use database name;
// tire array
STire* tires[WT_VAR_CNT];
pthread_mutex_t tiresMutex;
//save thread handle obtain var name from db server
pthread_t* threads[WT_FROM_DB_CNT];
// obtain var name with sql from server
char varTypes[WT_VAR_CNT][64] = {
"<db_name>",
"<stb_name>",
"<tb_name>",
"<dnode_id>",
"<user_name>",
"<all_table>",
"<function>",
"<keyword>",
"<tb_actions>",
"<db_options>",
"<data_types>",
"<key_tags>",
"<anyword>"
};
char varSqls[WT_FROM_DB_CNT][64] = {
"show databases;",
"show stables;",
"show tables;",
"show dnodes;",
"show users;"
};
// var words current cursor, if user press any one key except tab, cursorVar can be reset to -1
int cursorVar = -1;
bool varMode = false; // enter var names list mode
TAOS* varCon = NULL;
Command* varCmd = NULL;
SMatch* lastMatch = NULL; // save last match result
int cntDel = 0; // delete byte count after next press tab
// show auto tab introduction
void printfIntroduction() {
printf(" **************************** SUPPORT TAB KEY ************************************\n");
printf(" * Taos shell support press TAB key to complete word. You can try it. *\n");
printf(" * Anywhere press SPACE key following TAB key, You'll get surprise. *\n");
printf(" * SUPPORT SHORTCUT: *\n");
printf(" * [ Ctrl + A ] ...... move cursor to line [A]head *\n");
printf(" * [ Ctrl + M ] ...... move cursor to line [M]iddle *\n");
printf(" * [ Ctrl + E ] ...... move cursor to line [E]nd *\n");
printf(" * [ Ctrl + L ] ...... clean screen *\n");
printf(" * [ Ctrl + K ] ...... clean after cursor *\n");
printf(" * [ Ctrl + U ] ...... clean before cursor *\n");
printf(" * *\n");
printf(" **********************************************************************************\n\n");
}
void showHelp() {
printf("\nThe following are supported commands for Taos shell:");
printf("\n\
----- A ----- \n\
alter database <db_name> <db_options> \n\
alter dnode <dnode_id> balance \n\
alter dnode <dnode_id> resetlog;\n\
alter dnode <dnode_id> debugFlag 141;\n\
alter dnode <dnode_id> monitor 1;\n\
alter table <tb_name> ADD COLUMN <field_name> <data_type>; \n\
alter table <tb_name> DROP COLUMN <field_name>; \n\
alter table <tb_name> MODIFY COLUMN <field_name> <data_type(length)>;\n\
alter topic <topic_name>\n\
alter user <user_name> pass\n\
alter user <user_name> privilege read ;\n\
alter user <user_name> privilege write ;\n\
----- C ----- \n\
create table <tb_name> using <stb_name> tags ...\n\
create database <db_name>;\n\
create table <anyword> as ...\n\
create dnode <dnode_id>\n\
create topic <top_name>\n\
create function <function_name>\n\
create user <user_name> pass <password>;\n\
compact vnode in (vgid,vgid,vgid);\n\
----- D ----- \n\
describe <all_table> ;\n\
delete from <all_table> where ... \n\
drop database <db_name>;\n\
drop dnode <dnode_id>;\n\
drop function <function_id>;\n\
drop topic <topic_id>;\n\
drop table <all_table>;\n\
drop user <user_name>;\n\
----- K ----- \n\
kill connection <connection_id>; \n\
kill query <query_id>; \n\
kill stream <stream_id>; \n\
----- S ----- \n\
select * from <all_table> where ... \n\
select _block_dist() from <all_table>;\n\
select client_version();\n\
select current_user();\n\
select database;\n\
select server_version();\n\
set max_binary_display_width <width>; \n\
show create database <db_name>;\n\
show create stable <stb_name>;\n\
show create table <tb_name>;\n\
show connections;\n\
show databases;\n\
show dnodes;\n\
show functions;\n\
show modules;\n\
show mnodes;\n\
show queries;\n\
show stables;\n\
show stables like '<regular expression>'; note: regular expression only support '_' and '%%' match.\n\
show streams;\n\
show scores;\n\
show tables;\n\
show tables like '<regular expression>'; \n\
show users;\n\
show variables;\n\
show vgroups;\n\
----- I ----- \n\
insert into <tb_name> values(...) ;\n\
----- U ----- \n\
use <db_name>;");
printf("\n\n");
//define in getDuration() function
printf("\
Timestamp expression Format:\n\
b - nanosecond \n\
u - microsecond \n\
a - millisecond \n\
s - second \n\
m - minute \n\
h - hour \n\
d - day \n\
w - week \n\
now - current time \n\
Example : \n\
select * from t1 where ts > now - 2w + 3d and ts <= now - 1w -2h ;\n");
printf("\n");
}
//
// ------------------- parse words --------------------------
//
#define SHELL_COMMAND_COUNT() (sizeof(shellCommands) / sizeof(SWords))
// get at
SWord * atWord(SWords * command, int32_t index) {
SWord * word = command->head;
for (int32_t i = 0; i < index; i++) {
if (word == NULL)
return NULL;
word = word->next;
}
return word;
}
#define MATCH_WORD(x) atWord(x, x->matchIndex)
int wordType(const char* p, int32_t len) {
for (int i = 0; i < WT_VAR_CNT; i++) {
if (strncmp(p, varTypes[i], len) == 0)
return i;
}
return WT_TEXT;
}
// add word
SWord * addWord(const char* p, int32_t len, bool pattern) {
SWord* word = (SWord *) malloc(sizeof(SWord));
memset(word, 0, sizeof(SWord));
word->word = (char* )p;
word->len = len;
// check format
if (pattern) {
word->type = wordType(p, len);
} else {
word->type = WT_TEXT;
}
return word;
}
// parse one command
void parseCommand(SWords * command, bool pattern) {
char * p = command->source;
int32_t start = 0;
int32_t size = command->source_len > 0 ? command->source_len : strlen(p);
bool lastBlank = false;
for (int i = 0; i <= size; i++) {
if (p[i] == ' ' || i == size) {
// check continue blank like ' '
if (p[i] == ' ') {
if (lastBlank) {
start ++;
continue;
}
if (i == 0) { // first blank
lastBlank = true;
start ++;
continue;
}
lastBlank = true;
}
// found split or string end , append word
if (command->head == NULL) {
command->head = addWord(p + start, i - start, pattern);
command->count = 1;
} else {
SWord * word = command->head;
while (word->next) {
word = word->next;
}
word->next = addWord(p + start, i - start, pattern);
command->count ++;
}
start = i + 1;
} else {
lastBlank = false;
}
}
}
// free Command
void freeCommand(SWords * command) {
SWord * word = command->head;
if (word == NULL) {
return ;
}
// loop
while (word->next) {
SWord * tmp = word;
word = word->next;
// if malloc need free
if(tmp->free && tmp->word)
free(tmp->word);
free(tmp);
}
// if malloc need free
if(word->free && word->word)
free(word->word);
free(word);
}
void GenerateVarType(int type, char** p, int count) {
STire* tire = createTire(TIRE_LIST);
for (int i = 0; i < count; i++) {
insertWord(tire, p[i]);
}
pthread_mutex_lock(&tiresMutex);
tires[type] = tire;
pthread_mutex_unlock(&tiresMutex);
}
//
// -------------------- shell auto ----------------
//
// init shell auto funciton , shell start call once
bool shellAutoInit() {
// command
int32_t count = SHELL_COMMAND_COUNT();
for (int32_t i = 0; i < count; i ++) {
parseCommand(shellCommands + i, true);
}
// tires
memset(tires, 0, sizeof(STire*) * WT_VAR_CNT);
pthread_mutex_init(&tiresMutex, NULL);
// threads
memset(threads, 0, sizeof(pthread_t*) * WT_VAR_CNT);
// generate varType
GenerateVarType(WT_VAR_FUNC, functions, sizeof(functions) /sizeof(char *));
GenerateVarType(WT_VAR_KEYWORD, keywords, sizeof(keywords) /sizeof(char *));
GenerateVarType(WT_VAR_DBOPTION, db_options, sizeof(db_options) /sizeof(char *));
GenerateVarType(WT_VAR_TBACTION, tb_actions, sizeof(tb_actions) /sizeof(char *));
GenerateVarType(WT_VAR_DATATYPE, data_types, sizeof(data_types) /sizeof(char *));
GenerateVarType(WT_VAR_KEYTAGS, key_tags, sizeof(key_tags) /sizeof(char *));
printfIntroduction();
return true;
}
// exit shell auto funciton, shell exit call once
void shellAutoExit() {
// free command
int32_t count = SHELL_COMMAND_COUNT();
for (int32_t i = 0; i < count; i ++) {
freeCommand(shellCommands + i);
}
// free tires
pthread_mutex_lock(&tiresMutex);
for (int32_t i = 0; i < WT_VAR_CNT; i++) {
if (tires[i]) {
freeTire(tires[i]);
tires[i] = NULL;
}
}
pthread_mutex_unlock(&tiresMutex);
// destory
pthread_mutex_destroy(&tiresMutex);
// free threads
for (int32_t i = 0; i < WT_VAR_CNT; i++) {
if (threads[i]) {
taosDestroyThread(threads[i]);
threads[i] = NULL;
}
}
// free lastMatch
if (lastMatch) {
freeMatch(lastMatch);
lastMatch = NULL;
}
}
//
// ------------------- auto ptr for tires --------------------------
//
bool setNewAuotPtr(int type, STire* pNew) {
if (pNew == NULL)
return false;
pthread_mutex_lock(&tiresMutex);
STire* pOld = tires[type];
if (pOld != NULL) {
// previous have value, release self ref count
if (--pOld->ref == 0) {
freeTire(pOld);
}
}
// set new
tires[type] = pNew;
tires[type]->ref = 1;
pthread_mutex_unlock(&tiresMutex);
return true;
}
// get ptr
STire* getAutoPtr(int type) {
if (tires[type] == NULL) {
return NULL;
}
pthread_mutex_lock(&tiresMutex);
tires[type]->ref++;
pthread_mutex_unlock(&tiresMutex);
return tires[type];
}
// put back tire to tires[type], if tire not equal tires[type].p, need free tire
void putBackAutoPtr(int type, STire* tire) {
if (tire == NULL) {
return ;
}
pthread_mutex_lock(&tiresMutex);
if (tires[type] != tire) {
//update by out, can't put back , so free
if (--tire->ref == 1) {
// support multi thread getAuotPtr
freeTire(tire);
}
} else {
tires[type]->ref--;
assert(tires[type]->ref > 0);
}
pthread_mutex_unlock(&tiresMutex);
return ;
}
//
// ------------------- var Word --------------------------
//
#define MAX_CACHED_CNT 100000 // max cached rows 10w
// write sql result to var name, return write rows cnt
int writeVarNames(int type, TAOS_RES* tres) {
// fetch row
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
}
TAOS_FIELD *fields = taos_fetch_fields(tres);
// create new tires
char tireType = type == WT_VAR_TABLE ? TIRE_TREE : TIRE_LIST;
STire* tire = createTire(tireType);
// enum rows
char name[1024];
int numOfRows = 0;
do {
int32_t* lengths = taos_fetch_lengths(tres);
int32_t bytes = lengths[0];
if(fields[0].type == TSDB_DATA_TYPE_SMALLINT) {
sprintf(name,"%d", *(int16_t*)row[0]);
} else {
memcpy(name, row[0], bytes);
}
name[bytes] = 0; //set string end
// insert to tire
insertWord(tire, name);
if (++numOfRows > MAX_CACHED_CNT ) {
break;
}
row = taos_fetch_row(tres);
} while (row != NULL);
// replace old tire
setNewAuotPtr(type, tire);
return numOfRows;
}
bool firstMatchCommand(TAOS * con, Command * cmd);
//
// thread obtain var thread from db server
//
void* varObtainThread(void* param) {
int type = *(int* )param;
free(param);
if (varCon == NULL || type > WT_FROM_DB_MAX) {
return NULL;
}
TAOS_RES* pSql = taos_query_h(varCon, varSqls[type], NULL);
if (taos_errno(pSql)) {
taos_free_result(pSql);
return NULL;
}
// write var names from pSql
int cnt = writeVarNames(type, pSql);
// free sql
taos_free_result(pSql);
// check need call auto tab
if (cnt > 0 && waitAutoFill) {
// press tab key by program
firstMatchCommand(varCon, varCmd);
}
return NULL;
}
// only match next one word from all match words, return valuue must free by caller
char* matchNextPrefix(STire* tire, char* pre) {
SMatch* match = NULL;
// re-use last result
if (lastMatch) {
if (strcmp(pre, lastMatch->pre) == 0) {
// same pre
match = lastMatch;
}
}
if (match == NULL) {
// not same with last result
if (pre[0] == 0) {
// EMPTY PRE
match = enumAll(tire);
} else {
// NOT EMPTY
match = matchPrefix(tire, pre, NULL);
}
// save to lastMatch
if (match) {
if (lastMatch)
freeMatch(lastMatch);
lastMatch = match;
}
}
// check valid
if (match == NULL || match->head == NULL) {
// no one matched
return false;
}
if (cursorVar == -1) {
// first
cursorVar = 0;
return strdup(match->head->word);
}
// according to cursorVar , calculate next one
int i = 0;
SMatchNode* item = match->head;
while (item) {
if (i == cursorVar + 1) {
// found next position ok
if (item->next == NULL) {
// match last item, reset cursorVar to head
cursorVar = -1;
} else {
cursorVar = i;
}
return strdup(item->word);
}
// check end item
if (item->next == NULL) {
// if cursorVar > var list count, return last and reset cursorVar
cursorVar = -1;
return strdup(item->word);
}
// move next
item = item->next;
i++;
}
return NULL;
}
// search pre word from tire tree, return value must free by caller
char* tireSearchWord(int type, char* pre) {
if (type == WT_TEXT) {
return NULL;
}
if(type > WT_FROM_DB_MAX) {
// NOT FROM DB , tires[type] alwary not null
STire* tire = tires[type];
if (tire == NULL)
return NULL;
return matchNextPrefix(tire, pre);
}
// TYPE CONTEXT GET FROM DB
pthread_mutex_lock(&tiresMutex);
// check need obtain from server
if (tires[type] == NULL) {
waitAutoFill = true;
// need async obtain var names from db sever
if (threads[type] != NULL) {
if (taosThreadRunning(threads[type])) {
// thread running , need not obtain again, return
pthread_mutex_unlock(&tiresMutex);
return NULL;
}
// destroy previous thread handle for new create thread handle
taosDestroyThread(threads[type]);
threads[type] = NULL;
}
// create new
void * param = malloc(sizeof(int));
*((int* )param) = type;
threads[type] = taosCreateThread(varObtainThread, param);
pthread_mutex_unlock(&tiresMutex);
return NULL;
}
pthread_mutex_unlock(&tiresMutex);
// can obtain var names from local
STire* tire = getAutoPtr(type);
if (tire == NULL) {
return NULL;
}
char* str = matchNextPrefix(tire, pre);
// used finish, put back pointer to autoptr array
putBackAutoPtr(type, tire);
return str;
}
// match var word, word1 is pattern , word2 is input from shell
bool matchVarWord(SWord* word1, SWord* word2) {
// search input word from tire tree
char pre[512];
memcpy(pre, word2->word, word2->len);
pre[word2->len] = 0;
char* str = NULL;
if (word1->type == WT_VAR_ALLTABLE) {
// ALL_TABLE
str = tireSearchWord(WT_VAR_STABLE, pre);
if (str == NULL) {
str = tireSearchWord(WT_VAR_TABLE, pre);
if(str == NULL)
return false;
}
} else {
// OTHER
str = tireSearchWord(word1->type, pre);
if (str == NULL) {
// not found or word1->type variable list not obtain from server, return not match
return false;
}
}
// free previous malloc
if(word1->free && word1->word) {
free(word1->word);
}
// save
word1->word = str;
word1->len = strlen(str);
word1->free = true; // need free
return true;
}
//
// ------------------- match words --------------------------
//
// compare command cmd1 come from shellCommands , cmd2 come from user input
int32_t compareCommand(SWords * cmd1, SWords * cmd2) {
SWord * word1 = cmd1->head;
SWord * word2 = cmd2->head;
if (word1 == NULL || word2 == NULL) {
return -1;
}
for (int32_t i = 0; i < cmd1->count; i++) {
if (word1->type == WT_TEXT) {
// WT_TEXT match
if (word1->len == word2->len) {
if (strncasecmp(word1->word, word2->word, word1->len) != 0)
return -1;
} else if (word1->len < word2->len) {
return -1;
} else {
// word1->len > word2->len
if (strncasecmp(word1->word, word2->word, word2->len) == 0) {
cmd1->matchIndex = i;
cmd1->matchLen = word2->len;
return i;
} else {
return -1;
}
}
} else {
// WT_VAR auto match any one word
if (word2->next == NULL) { // input words last one
if (matchVarWord(word1, word2)) {
cmd1->matchIndex = i;
cmd1->matchLen = word2->len;
varMode = true;
return i;
}
return -1;
}
}
// move next
word1 = word1->next;
word2 = word2->next;
if (word1 == NULL || word2 == NULL) {
return -1;
}
}
return -1;
}
// match command
SWords * matchCommand(SWords * input, bool continueSearch) {
int32_t count = SHELL_COMMAND_COUNT();
for (int32_t i = 0; i < count; i ++) {
SWords * shellCommand = shellCommands + i;
if (continueSearch && lastMatchIndex != -1 && i <= lastMatchIndex) {
// new match must greate than lastMatchIndex
if (varMode && i == lastMatchIndex) {
// do nothing, var match on lastMatchIndex
} else {
continue;
}
}
// command is large
if (input->count > shellCommand->count ) {
continue;
}
// compare
int32_t index = compareCommand(shellCommand, input);
if (index != -1) {
if (firstMatchIndex == -1)
firstMatchIndex = i;
curMatchIndex = i;
return &shellCommands[i];
}
}
// not match
return NULL;
}
//
// ------------------- print screen --------------------------
//
// delete char count
void deleteCount(Command * cmd, int count) {
int size = 0;
int width = 0;
clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size);
// loop delete
while (--count >= 0 && cmd->cursorOffset > 0) {
getPrevCharSize(cmd->command, cmd->cursorOffset, &size, &width);
memmove(cmd->command + cmd->cursorOffset - size, cmd->command + cmd->cursorOffset,
cmd->commandSize - cmd->cursorOffset);
cmd->commandSize -= size;
cmd->cursorOffset -= size;
cmd->screenOffset -= width;
cmd->endOffset -= width;
}
}
// show screen
void printScreen(TAOS * con, Command * cmd, SWords * match) {
// modify Command
if (firstMatchIndex == -1 || curMatchIndex == -1) {
// no match
return ;
}
// first tab press
const char * str = NULL;
int strLen = 0;
if (firstMatchIndex == curMatchIndex && lastWordBytes == -1) {
// first press tab
SWord * word = MATCH_WORD(match);
str = word->word + match->matchLen;
strLen = word->len - match->matchLen;
lastMatchIndex = firstMatchIndex;
lastWordBytes = word->len;
} else {
if (lastWordBytes == -1)
return ;
deleteCount(cmd, lastWordBytes);
SWord * word = MATCH_WORD(match);
str = word->word;
strLen = word->len;
// set current to last
lastMatchIndex = curMatchIndex;
lastWordBytes = word->len;
}
// insert new
insertChar(cmd, (char *)str, strLen);
}
// main key press tab , matched return true else false
bool firstMatchCommand(TAOS * con, Command * cmd) {
// parse command
SWords* input = (SWords *)malloc(sizeof(SWords));
memset(input, 0, sizeof(SWords));
input->source = cmd->command;
input->source_len = cmd->commandSize;
parseCommand(input, false);
// if have many , default match first, if press tab again , switch to next
curMatchIndex = -1;
lastMatchIndex = -1;
SWords * match = matchCommand(input, true);
if (match == NULL) {
// not match , nothing to do
freeCommand(input);
return false;
}
// print to screen
printScreen(con, cmd, match);
freeCommand(input);
return true;
}
// create input source
void createInputFromFirst(SWords* input, SWords * firstMatch) {
//
// if next pressTabKey , input context come from firstMatch, set matched length with source_len
//
input->source = (char*)malloc(1024);
memset((void* )input->source, 0, 1024);
SWord * word = firstMatch->head;
// source_len = full match word->len + half match with firstMatch->matchLen
for (int i = 0; i < firstMatch->matchIndex && word; i++) {
// combine source from each word
strncpy(input->source + input->source_len, word->word, word->len);
strcat(input->source, " "); // append blank splite
input->source_len += word->len + 1; // 1 is blank length
// move next
word = word->next;
}
// appand half matched word for last
if (word) {
strncpy(input->source + input->source_len, word->word, firstMatch->matchLen);
input->source_len += firstMatch->matchLen;
}
}
// user press Tabkey again is named next , matched return true else false
bool nextMatchCommand(TAOS * con, Command * cmd, SWords * firstMatch) {
if (firstMatch == NULL || firstMatch->head == NULL) {
return false;
}
SWords* input = (SWords *)malloc(sizeof(SWords));
memset(input, 0, sizeof(SWords));
// create input from firstMatch
createInputFromFirst(input, firstMatch);
// parse input
parseCommand(input, false);
// if have many , default match first, if press tab again , switch to next
SWords * match = matchCommand(input, true);
if (match == NULL) {
// if not match , reset all index
firstMatchIndex = -1;
curMatchIndex = -1;
match = matchCommand(input, false);
if (match == NULL) {
freeCommand(input);
return false;
}
}
// print to screen
printScreen(con, cmd, match);
// free
if (input->source) {
free(input->source);
input->source = NULL;
}
freeCommand(input);
return true;
}
// fill with type
bool fillWithType(TAOS * con, Command * cmd, char* pre, int type) {
// get type
STire* tire = tires[type];
char* str = matchNextPrefix(tire, pre);
if (str == NULL) {
return false;
}
// need insert part string
char * part = str + strlen(pre);
// show
int count = strlen(part);
insertChar(cmd, part, count);
cntDel = count; // next press tab delete current append count
free(str);
return true;
}
// fill with type
bool fillTableName(TAOS * con, Command * cmd, char* pre) {
// search stable and table
char * str = tireSearchWord(WT_VAR_STABLE, pre);
if (str == NULL) {
str = tireSearchWord(WT_VAR_TABLE, pre);
if(str == NULL)
return false;
}
// need insert part string
char * part = str + strlen(pre);
// delete autofill count last append
if(cntDel > 0) {
deleteCount(cmd, cntDel);
cntDel = 0;
}
// show
int count = strlen(part);
insertChar(cmd, part, count);
cntDel = count; // next press tab delete current append count
free(str);
return true;
}
//
// find last word from sql select clause
// example :
// 1 select cou -> press tab select count(
// 2 select count(*),su -> select count(*), sum(
// 3 select count(*), su -> select count(*), sum(
//
char * lastWord(char * p) {
// get near from end revert find ' ' and ','
char * p1 = strrchr(p, ' ');
char * p2 = strrchr(p, ',');
if (p1 && p2) {
return MAX(p1, p2) + 1;
} else if (p1) {
return p1 + 1;
} else if(p2) {
return p2 + 1;
} else {
return p;
}
}
bool fieldsInputEnd(char* sql) {
// not in '()'
char* p1 = strrchr(sql, '(');
char* p2 = strrchr(sql, ')');
if (p1 && p2 == NULL) {
// like select count( ' '
return false;
} else if (p1 && p2 && p1 > p2) {
// like select sum(age), count( ' '
return false;
}
// not in ','
char * p = strrchr(sql, ',');
// like select ts, age,' '
if (p) {
++p;
bool allBlank = true;
int cnt = 0; // blank count , continue many blank is one blank
char * plast = NULL;
while(*p) {
if (*p != ' ') {
allBlank = false;
plast = NULL;
} else {
if(plast == NULL) {
plast = p;
cnt ++;
}
}
++p;
}
// any one word is not blank
if(allBlank) {
return false;
}
// if last char not ' ', then not end field, like select count(*), su + tab can fill sum(
if(sql[strlen(sql)-1] != ' ' && cnt <= 1) {
return false;
}
}
char * p3 = strrchr(sql, ' ');
if(p3 == NULL) {
// only one word
return false;
}
return true;
}
// need insert from
bool needInsertFrom(char * sql, int len) {
// last is blank
if(sql[len-1] != ' ') {
// insert from keyword
return false;
}
// select fields input is end
if (!fieldsInputEnd(sql)) {
return false;
}
// can insert from keyword
return true;
}
bool matchSelectQuery(TAOS * con, Command * cmd) {
// if continue press Tab , delete bytes by previous autofill
if (cntDel > 0) {
deleteCount(cmd, cntDel);
cntDel = 0;
}
// match select ...
int len = cmd->commandSize;
char * p = cmd->command;
// remove prefix blank
while (p[0] == ' ' && len > 0) {
p++;
len--;
}
// special range
if(len < 7 || len > 512) {
return false;
}
// select and from
if(strncasecmp(p, "select ", 7) != 0) {
// not select query clause
return false;
}
p += 7;
len -= 7;
char* ps = p = strndup(p, len);
// union all
char * p1;
do {
p1 = strstr(p, UNION_ALL);
if(p1) {
p = p1 + strlen(UNION_ALL);
}
} while (p1);
char * from = strstr(p, " from ");
//last word , maybe empty string or some letters of a string
char * last = lastWord(p);
bool ret = false;
if (from == NULL) {
bool fieldEnd = fieldsInputEnd(p);
// cheeck fields input end then insert from keyword
if (fieldEnd && p[len-1] == ' ') {
insertChar(cmd, "from", 4);
free(ps);
return true;
}
// fill funciton
if(fieldEnd) {
// fields is end , need match keyword
ret = fillWithType(con, cmd, last, WT_VAR_KEYWORD);
} else {
ret = fillWithType(con, cmd, last, WT_VAR_FUNC);
}
free(ps);
return ret;
}
// have from
char * blank = strstr(from + 6, " ");
if (blank == NULL) {
// no table name, need fill
ret = fillTableName(con, cmd, last);
} else {
ret = fillWithType(con, cmd, last, WT_VAR_KEYWORD);
}
free(ps);
return ret;
}
// if is input create fields or tags area, return true
bool isCreateFieldsArea(char * p) {
char * left = strrchr(p, '(');
if (left == NULL) {
// like 'create table st'
return false;
}
char * right = strrchr(p, ')');
if(right == NULL) {
// like 'create table st( '
return true;
}
if (left > right) {
// like 'create table st( ts timestamp, age int) tags(area '
return true;
}
return false;
}
bool matchCreateTable(TAOS * con, Command * cmd) {
// if continue press Tab , delete bytes by previous autofill
if (cntDel > 0) {
deleteCount(cmd, cntDel);
cntDel = 0;
}
// match select ...
int len = cmd->commandSize;
char * p = cmd->command;
// remove prefix blank
while (p[0] == ' ' && len > 0) {
p++;
len--;
}
// special range
if(len < 7 || len > 1024) {
return false;
}
// select and from
if(strncasecmp(p, "create table ", 13) != 0) {
// not select query clause
return false;
}
p += 13;
len -= 13;
char* ps = strndup(p, len);
bool ret = false;
char * last = lastWord(ps);
// check in create fields or tags input area
if (isCreateFieldsArea(ps)) {
ret = fillWithType(con, cmd, last, WT_VAR_DATATYPE);
}
// tags
if (!ret) {
// find only one ')' , can insert tags
char * p1 = strchr(ps, ')');
if (p1) {
if(strchr(p1 + 1, ')') == NULL && strstr(p1 + 1, "tags") == NULL) {
// can insert tags keyword
ret = fillWithType(con, cmd, last, WT_VAR_KEYTAGS);
}
}
}
free(ps);
return ret;
}
bool matchOther(TAOS * con, Command * cmd) {
int len = cmd->commandSize;
char* p = cmd->command;
if (p[len - 1] == '\\') {
// append '\G'
char a[] = "G;";
insertChar(cmd, a, 2);
return true;
}
return false;
}
// main key press tab
void pressTabKey(TAOS * con, Command * cmd) {
// check
if (cmd->commandSize == 0) {
// empty
showHelp();
showOnScreen(cmd);
return ;
}
// save connection to global
varCon = con;
varCmd = cmd;
bool matched = false;
// manual match like create table st( ...
matched = matchCreateTable(con, cmd);
if (matched)
return ;
// shellCommands match
if (firstMatchIndex == -1) {
matched = firstMatchCommand(con, cmd);
} else {
matched = nextMatchCommand(con, cmd, &shellCommands[firstMatchIndex]);
}
if (matched)
return ;
// NOT MATCHED ANYONE
// match other like '\G' ...
matched = matchOther(con, cmd);
if (matched)
return ;
// manual match like select * from ...
matched = matchSelectQuery(con, cmd);
if (matched)
return ;
return ;
}
// press othr key
void pressOtherKey(char c) {
// reset global variant
firstMatchIndex = -1;
lastMatchIndex = -1;
curMatchIndex = -1;
lastWordBytes = -1;
// var names
cursorVar = -1;
varMode = false;
waitAutoFill = false;
cntDel = 0;
if (lastMatch) {
freeMatch(lastMatch);
lastMatch = NULL;
}
//printf(" -> %d <-\n", c);
}
// put name into name, return name length
int getWordName(char* p, char * name, int nameLen) {
//remove prefix blank
while (*p == ' ') {
p++;
}
// get databases name;
int i = 0;
while(p[i] != 0 && i < nameLen - 1) {
name[i] = p[i];
i++;
if(p[i] == ' ' || p[i] == ';'|| p[i] == '(') {
// name end
break;
}
}
name[i] = 0;
return i;
}
// deal use db, if have 'use' return true
bool dealUseDB(char * sql) {
// check use keyword
if(strncasecmp(sql, "use ", 4) != 0) {
return false;
}
char db[256];
char *p = sql + 4;
if (getWordName(p, db, sizeof(db)) == 0) {
// no name , return
return true;
}
// dbName is previous use open db name
if (strcasecmp(db, dbName) == 0) {
// same , no need switch
return true;
}
// switch new db
pthread_mutex_lock(&tiresMutex);
// STABLE set null
STire* tire = tires[WT_VAR_STABLE];
tires[WT_VAR_STABLE] = NULL;
if(tire) {
freeTire(tire);
}
// TABLE set null
tire = tires[WT_VAR_TABLE];
tires[WT_VAR_TABLE] = NULL;
if(tire) {
freeTire(tire);
}
// save
strcpy(dbName, db);
pthread_mutex_unlock(&tiresMutex);
return true;
}
// deal create, if have 'create' return true
bool dealCreateCommand(char * sql) {
// check keyword
if(strncasecmp(sql, "create ", 7) != 0) {
return false;
}
char name[1024];
char *p = sql + 7;
if (getWordName(p, name, sizeof(name)) == 0) {
// no name , return
return true;
}
int type = -1;
// dbName is previous use open db name
if (strcasecmp(name, "database") == 0) {
type = WT_VAR_DBNAME;
} else if (strcasecmp(name, "table") == 0) {
if(strstr(sql, " tags") != NULL && strstr(sql, " using ") == NULL)
type = WT_VAR_STABLE;
else
type = WT_VAR_TABLE;
} else if (strcasecmp(name, "user") == 0) {
type = WT_VAR_USERNAME;
} else {
// no match , return
return true;
}
// move next
p += strlen(name);
// get next word , that is table name
if (getWordName(p, name, sizeof(name)) == 0) {
// no name , return
return true;
}
// switch new db
pthread_mutex_lock(&tiresMutex);
// STABLE set null
STire* tire = tires[type];
if(tire) {
insertWord(tire, name);
}
pthread_mutex_unlock(&tiresMutex);
return true;
}
// deal create, if have 'drop' return true
bool dealDropCommand(char * sql) {
// check keyword
if(strncasecmp(sql, "drop ", 5) != 0) {
return false;
}
char name[1024];
char *p = sql + 5;
if (getWordName(p, name, sizeof(name)) == 0) {
// no name , return
return true;
}
int type = -1;
// dbName is previous use open db name
if (strcasecmp(name, "database") == 0) {
type = WT_VAR_DBNAME;
} else if (strcasecmp(name, "table") == 0) {
type = WT_VAR_ALLTABLE;
} else if (strcasecmp(name, "dnode") == 0) {
type = WT_VAR_DNODEID;
} else if (strcasecmp(name, "user") == 0) {
type = WT_VAR_USERNAME;
} else {
// no match , return
return true;
}
// move next
p += strlen(name);
// get next word , that is table name
if (getWordName(p, name, sizeof(name)) == 0) {
// no name , return
return true;
}
// switch new db
pthread_mutex_lock(&tiresMutex);
// STABLE set null
if(type == WT_VAR_ALLTABLE) {
bool del = false;
// del in stable
STire* tire = tires[WT_VAR_STABLE];
if(tire)
del = deleteWord(tire, name);
// del in table
if(!del) {
tire = tires[WT_VAR_TABLE];
if(tire)
del = deleteWord(tire, name);
}
} else {
// OTHER TYPE
STire* tire = tires[type];
if(tire)
deleteWord(tire, name);
}
pthread_mutex_unlock(&tiresMutex);
return true;
}
// callback autotab module after shell sql execute
void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb) {
char * sql = sqlstr;
// remove prefix blank
while (*sql == ' ') {
sql++;
}
if(dealUseDB(sql)) {
// change to new db
return ;
}
// create command add name to autotab
if(dealCreateCommand(sql)) {
return ;
}
// drop command remove name from autotab
if(dealDropCommand(sql)) {
return ;
}
return ;
}
......@@ -79,8 +79,11 @@ void insertChar(Command *cmd, char *c, int size) {
/* update the values */
cmd->commandSize += size;
cmd->cursorOffset += size;
cmd->screenOffset += wcwidth(wc);
cmd->endOffset += wcwidth(wc);
for (int i = 0; i < size; i++) {
mbtowc(&wc, c + i, size);
cmd->screenOffset += wcwidth(wc);
cmd->endOffset += wcwidth(wc);
}
showOnScreen(cmd);
}
......@@ -179,6 +182,16 @@ void positionCursorHome(Command *cmd) {
}
}
void positionCursorMiddle(Command *cmd) {
if (cmd->endOffset > 0) {
clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size);
cmd->cursorOffset = cmd->commandSize/2;
cmd->screenOffset = cmd->endOffset/2;
showOnScreen(cmd);
}
}
void positionCursorEnd(Command *cmd) {
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
......
......@@ -27,6 +27,7 @@
#include "tglobal.h"
#include "tsclient.h"
#include "cJSON.h"
#include "shellAuto.h"
#include <regex.h>
......@@ -136,7 +137,7 @@ void shellInit(SShellArguments *_args) {
exit(EXIT_SUCCESS);
}
#endif
return;
}
......@@ -261,41 +262,22 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
int64_t st, et;
wordexp_t full_path;
char * sptr = NULL;
char * tmp = NULL;
char * cptr = NULL;
char * fname = NULL;
bool printMode = false;
int match;
sptr = command;
while ((sptr = tstrstr(sptr, ">>", true)) != NULL) {
// find the last ">>" if any
tmp = sptr;
sptr += 2;
}
sptr = tmp;
if (sptr != NULL) {
// select ... where col >> n op m ...;
match = regex_match(sptr + 2, "^\\s*.{1,}\\s*[\\>|\\<|\\<=|\\>=|=|!=]\\s*.{1,};\\s*$", REG_EXTENDED | REG_ICASE);
if (match == 0) {
// select col >> n from ...;
match = regex_match(sptr + 2, "^\\s*.{1,}\\s{1,}.{1,};\\s*$", REG_EXTENDED | REG_ICASE);
if (match == 0) {
cptr = tstrstr(command, ";", true);
if (cptr != NULL) {
*cptr = '\0';
}
if (wordexp(sptr + 2, &full_path, 0) != 0) {
fprintf(stderr, "ERROR: invalid filename: %s\n", sptr + 2);
return;
}
*sptr = '\0';
fname = full_path.we_wordv[0];
}
if ((sptr = tstrstr(command, ">>", true)) != NULL) {
cptr = tstrstr(command, ";", true);
if (cptr != NULL) {
*cptr = '\0';
}
if (wordexp(sptr + 2, &full_path, 0) != 0) {
fprintf(stderr, "ERROR: invalid filename: %s\n", sptr + 2);
return;
}
*sptr = '\0';
fname = full_path.we_wordv[0];
}
if ((sptr = tstrstr(command, "\\G", true)) != NULL) {
......@@ -323,10 +305,13 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
int64_t oresult = atomic_load_64(&result);
if (regex_match(command, "^\\s*use\\s+([a-zA-Z0-9_]+|`.+`)\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
fprintf(stdout, "Database changed.\n\n");
fflush(stdout);
// call back auto tab module
callbackAutoTab(command, pSql, true);
atomic_store_64(&result, 0);
freeResultWithRid(oresult);
return;
......@@ -361,10 +346,14 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
} else {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
}
} else {
int num_rows_affacted = taos_affected_rows(pSql);
et = taosGetTimestampUs();
printf("Query OK, %d of %d row(s) in database (%.6fs)\n", num_rows_affacted, num_rows_affacted, (et - st) / 1E6);
// call auto tab
callbackAutoTab(command, pSql, false);
}
printf("\n");
......@@ -1155,7 +1144,7 @@ int taos_base64_encode(unsigned char *source, size_t sourcelen, char *target, si
int parse_cloud_dsn() {
if (args.cloudDsn == NULL) {
fprintf(stderr, "Cannot read cloud service info\n");
return 1;
return -1;
} else {
char *start = strstr(args.cloudDsn, "http://");
if (start != NULL) {
......@@ -1171,7 +1160,7 @@ int parse_cloud_dsn() {
char *port = strstr(args.cloudHost, ":");
if ((port == NULL) || (port + strlen(":")) == NULL) {
fprintf(stderr, "Invalid format in TDengine cloud dsn: %s\n", args.cloudDsn);
return 1;
return -1;
}
char *token = strstr(port + strlen(":"), "?token=");
if ((token == NULL) || (token + strlen("?token=")) == NULL ||
......@@ -1229,12 +1218,11 @@ int wsclient_handshake() {
return 0;
}
int wsclient_send(char *strdata) {
int wsclient_send(char *strdata, WebSocketFrameType frame) {
struct timeval tv;
unsigned char mask[4];
unsigned int mask_int;
unsigned long long payload_len;
unsigned char finNopcode;
unsigned int payload_len_small;
unsigned int payload_offset = 6;
unsigned int len_size;
......@@ -1248,7 +1236,6 @@ int wsclient_send(char *strdata) {
mask_int = rand();
memcpy(mask, &mask_int, 4);
payload_len = strlen(strdata);
finNopcode = 0x81;
if (payload_len <= 125) {
frame_size = 6 + payload_len;
payload_len_small = payload_len;
......@@ -1266,7 +1253,7 @@ int wsclient_send(char *strdata) {
}
data = (char *)malloc(frame_size);
memset(data, 0, frame_size);
*data = finNopcode;
*data = frame;
*(data + 1) = payload_len_small | 0x80;
if (payload_len_small == 126) {
payload_len &= 0xffff;
......@@ -1293,13 +1280,16 @@ int wsclient_send(char *strdata) {
sent += i;
}
if (i < 0) {
fprintf(stderr, "websocket send data error\n");
fprintf(stderr, "websocket send data error, please check the server\n");
free(data);
return -1;
}
free(data);
return 0;
}
int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int id) {
int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int64_t id) {
int code = 1;
cJSON *json = cJSON_CreateObject();
cJSON *_args = cJSON_CreateObject();
cJSON_AddNumberToObject(_args, "req_id", 1);
......@@ -1323,15 +1313,22 @@ int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int id) {
cJSON_AddStringToObject(json, "action", "fetch_block");
cJSON_AddNumberToObject(_args, "id", id);
break;
case WS_CLOSE:
cJSON_AddStringToObject(json, "action", "close");
cJSON_AddNumberToObject(_args, "id", id);
break;
}
cJSON_AddItemToObject(json, "args", _args);
char *strdata = NULL;
strdata = cJSON_Print(json);
if (wsclient_send(strdata)) {
free(strdata);
return -1;
}
return 0;
if (wsclient_send(strdata, TEXT_FRAME)) {
goto OVER;
}
code = 0;
OVER:
free(strdata);
cJSON_Delete(json);
return code;
}
int wsclient_conn() {
......@@ -1366,7 +1363,6 @@ int wsclient_conn() {
} else {
fprintf(stdout, "Successfully connect to %s:%d in restful mode\n\n", args.host, args.port);
}
return 0;
} else {
cJSON *message = cJSON_GetObjectItem(root, "message");
......@@ -1381,143 +1377,135 @@ int wsclient_conn() {
return -1;
}
cJSON *wsclient_parse_response() {
char *recv_buffer = calloc(1, 4096);
int start = 0;
bool found = false;
int received = 0;
int bytes;
int recv_length = 4095;
do {
bytes = recv(args.socket, recv_buffer + received, recv_length - received, 0);
if (bytes == -1) {
free(recv_buffer);
fprintf(stderr, "websocket recv failed with bytes: %d\n", bytes);
return NULL;
void wsclient_parse_frame(SWSParser * parser, uint8_t * recv_buffer) {
unsigned char msg_opcode = recv_buffer[0] & 0x0F;
unsigned char msg_masked = (recv_buffer[1] >> 7) & 0x01;
int payload_length = 0;
int pos = 2;
int length_field = recv_buffer[1] &(~0x80);
unsigned int mask = 0;
if (length_field <= 125) {
payload_length = length_field;
} else if (length_field == 126) {
payload_length = recv_buffer[2];
for (int i = 0; i < 1; i++) {
payload_length = (payload_length << 8) + recv_buffer[3 + i];
}
if (!found) {
for (; start < recv_length - received; start++) {
if ((recv_buffer + start)[0] == '{') {
found = true;
break;
}
}
}
if (NULL != strstr(recv_buffer + start, "}")) {
break;
pos += 2;
} else if (length_field == 127) {
payload_length = recv_buffer[2];
for (int i = 0; i < 7; i++) {
payload_length = (payload_length << 8) + recv_buffer[3 + i];
}
received += bytes;
if (received >= recv_length) {
recv_length += 4096;
recv_buffer = realloc(recv_buffer + start, recv_length);
pos += 8;
}
if (msg_masked) {
mask = *((unsigned int *) (recv_buffer + pos));
pos += 4;
const uint8_t *c = recv_buffer + pos;
for (int i = 0; i < payload_length; i++) {
recv_buffer[i] = c[i] ^ ((unsigned char *) (&mask))[i % 4];
}
} while (1);
cJSON *res = cJSON_Parse(recv_buffer + start);
if (res == NULL) {
fprintf(stderr, "fail to parse response into json: %s\n", recv_buffer + start);
free(recv_buffer);
return NULL;
}
return res;
if (msg_opcode == 0x9) {
parser->frame = PING_FRAME;
}
parser->offset = pos;
parser->payload_length = payload_length;
}
TAOS_FIELD *wsclient_print_header(cJSON *query, int *pcols, int *pprecison) {
TAOS_FIELD *fields = NULL;
cJSON *fields_count = cJSON_GetObjectItem(query, "fields_count");
if (cJSON_IsNumber(fields_count)) {
*pcols = (int)fields_count->valueint;
fields = calloc((int)fields_count->valueint, sizeof(TAOS_FIELD));
cJSON *fields_names = cJSON_GetObjectItem(query, "fields_names");
cJSON *fields_types = cJSON_GetObjectItem(query, "fields_types");
cJSON *fields_lengths = cJSON_GetObjectItem(query, "fields_lengths");
if (cJSON_IsArray(fields_names) && cJSON_IsArray(fields_types) && cJSON_IsArray(fields_lengths)) {
for (int i = 0; i < (int)fields_count->valueint; i++) {
strncpy(fields[i].name, cJSON_GetArrayItem(fields_names, i)->valuestring, 65);
fields[i].type = (uint8_t)cJSON_GetArrayItem(fields_types, i)->valueint;
fields[i].bytes = (int16_t)cJSON_GetArrayItem(fields_lengths, i)->valueint;
}
cJSON *precision = cJSON_GetObjectItem(query, "precision");
if (cJSON_IsNumber(precision)) {
*pprecison = (int)precision->valueint;
int width[TSDB_MAX_COLUMNS];
for (int col = 0; col < (int)fields_count->valueint; col++) {
width[col] = calcColWidth(fields + col, (int)precision->valueint);
}
printHeader(fields, width, (int)fields_count->valueint);
return fields;
} else {
fprintf(stderr, "Invalid precision key in json\n");
}
} else {
fprintf(stderr, "Invalid fields_names/fields_types/fields_lengths key in json\n");
char *wsclient_get_response() {
uint8_t recv_buffer[TEMP_RECV_BUF]= {0};
int received = 0;
SWSParser parser;
int bytes = recv(args.socket, recv_buffer + received, TEMP_RECV_BUF - 1, 0);
if (bytes <= 0) {
fprintf(stderr, "websocket recv failed with bytes: %d\n", bytes);
return NULL;
}
wsclient_parse_frame(&parser, recv_buffer);
if (parser.frame == PING_FRAME) {
if (wsclient_send("pong", PONG_FRAME)) {
return NULL;
}
} else {
fprintf(stderr, "Invalid fields_count key in json\n");
return wsclient_get_response();
}
char* response = calloc(1, parser.payload_length + 1);
int pos = bytes - parser.offset;
memcpy(response, recv_buffer + parser.offset, pos);
while (pos < parser.payload_length) {
bytes = recv(args.socket, response + pos, parser.payload_length - pos, 0);
pos += bytes;
}
if (fields != NULL) {
free(fields);
response[pos] = '\0';
return response;
}
int wsclient_fetch_fields(cJSON *query, TAOS_FIELD * fields, int cols) {
cJSON *fields_names = cJSON_GetObjectItem(query, "fields_names");
cJSON *fields_types = cJSON_GetObjectItem(query, "fields_types");
cJSON *fields_lengths = cJSON_GetObjectItem(query, "fields_lengths");
if (!cJSON_IsArray(fields_names) || !cJSON_IsArray(fields_types) || !cJSON_IsArray(fields_lengths)) {
fprintf(stderr, "Invalid or miss 'fields_names'/'fields_types'/'fields_lengths' key in response\n");
return -1;
}
for (int i = 0; i < cols; i++) {
cJSON* field_name = cJSON_GetArrayItem(fields_names, i);
cJSON* field_type = cJSON_GetArrayItem(fields_types, i);
cJSON* field_length = cJSON_GetArrayItem(fields_lengths, i);
if (!cJSON_IsString(field_name) || !cJSON_IsNumber(field_type) || !cJSON_IsNumber(field_length)) {
fprintf(stderr, "Invalid or miss 'field_name'/'field_type'/'field_length' in query response");
return -1;
}
strncpy(fields[i].name, field_name->valuestring, 65);
fields[i].type = (uint8_t)field_type->valueint;
fields[i].bytes = (int16_t)field_length->valueint;
}
return NULL;
return 0;
}
int wsclient_check(cJSON *root, int64_t st, int64_t et) {
cJSON *code = cJSON_GetObjectItem(root, "code");
if (cJSON_IsNumber(code)) {
if (code->valueint == 0) {
return 0;
} else {
cJSON *message = cJSON_GetObjectItem(root, "message");
if (cJSON_IsString(message)) {
fprintf(stderr, "\nDB error: %s (%.6fs)\n", message->valuestring, (et - st) / 1E6);
} else {
fprintf(stderr, "Invalid message key in json\n");
}
}
} else {
fprintf(stderr, "Invalid code key in json\n");
cJSON *message = cJSON_GetObjectItem(root, "message");
if (!cJSON_IsNumber(code) || !cJSON_IsString(message)) {
fprintf(stderr, "Invalid or miss 'code'/'message' in response\n");
return -1;
}
return -1;
if (code->valueint != 0) {
fprintf(stderr, "\nDB error: %s (%.6fs)\n", message->valuestring, (et - st) / 1E6);
return -1;
}
return 0;
}
int wsclient_print_data(int rows, TAOS_FIELD *fields, int cols, int64_t id, int precision, int* pshowed_rows) {
char *recv_buffer = calloc(1, 4096);
int col_length = 0;
for (int i = 0; i < cols; i++) {
col_length += fields[i].bytes;
char* response = wsclient_get_response();
if (response == NULL) {
return -1;
}
int total_recv_len = col_length * rows + 12;
int received = 0;
int recv_length = 4095;
int start = 0;
int pos;
do {
int bytes = recv(args.socket, recv_buffer + received, recv_length - received, 0);
received += bytes;
if (received >= recv_length) {
recv_length += 4096;
recv_buffer = realloc(recv_buffer, recv_length);
}
} while (received < total_recv_len);
while (1) {
if (*(int64_t *)(recv_buffer + start) == id) {
break;
}
start++;
if (*(int64_t *)response != id) {
fprintf(stderr, "Mismatch id with %"PRId64" expect %"PRId64"\n", *(int64_t *)response, id);
free(response);
return -1;
}
start += 8;
int pos;
int width[TSDB_MAX_COLUMNS];
for (int c = 0; c < cols; c++) {
width[c] = calcColWidth(fields + c, precision);
}
for (int i = 0; i < rows; i++) {
if (*pshowed_rows == DEFAULT_RES_SHOW_NUM) {
free(recv_buffer);
printf("\n");
printf(" Notice: The result shows only the first %d rows.\n", DEFAULT_RES_SHOW_NUM);
printf("\n");
printf(" You can use Ctrl+C to stop the underway fetching.\n");
printf("\n");
free(response);
return 0;
}
}
for (int c = 0; c < cols; c++) {
pos = start;
pos = 8;
pos += i * fields[c].bytes;
for (int j = 0; j < c; j++) {
pos += fields[j].bytes * rows;
......@@ -1526,17 +1514,17 @@ int wsclient_print_data(int rows, TAOS_FIELD *fields, int cols, int64_t id, int
int16_t length = 0;
if (fields[c].type == TSDB_DATA_TYPE_NCHAR || fields[c].type == TSDB_DATA_TYPE_BINARY ||
fields[c].type == TSDB_DATA_TYPE_JSON) {
length = *(int16_t *)(recv_buffer + pos);
length = *(int16_t *)(response + pos);
pos += 2;
}
printField((const char *)(recv_buffer + pos), fields + c, width[c], (int32_t)length, precision);
printField((const char *)(response + pos), fields + c, width[c], (int32_t)length, precision);
putchar(' ');
putchar('|');
}
putchar('\n');
*pshowed_rows += 1;
}
free(recv_buffer);
free(response);
return 0;
}
......@@ -1546,90 +1534,129 @@ void wsclient_query(char *command) {
if (wsclient_send_sql(command, WS_QUERY, 0)) {
return;
}
et = taosGetTimestampUs();
cJSON *query = wsclient_parse_response();
char *query_buffer = wsclient_get_response();
if (query_buffer == NULL) {
return;
}
cJSON* query = cJSON_Parse(query_buffer);
if (query == NULL) {
fprintf(stderr, "Failed to parse response into json: %s\n", query_buffer);
free(query_buffer);
return;
}
free(query_buffer);
et = taosGetTimestampUs();
if (wsclient_check(query, st, et)) {
cJSON_Delete(query);
return;
}
cJSON *is_update = cJSON_GetObjectItem(query, "is_update");
if (cJSON_IsBool(is_update)) {
if (is_update->valueint) {
cJSON *affected_rows = cJSON_GetObjectItem(query, "affected_rows");
if (cJSON_IsNumber(affected_rows)) {
printf("Update OK, %d row(s) in set (%.6fs)\n\n", (int)affected_rows->valueint, (et - st) / 1E6);
} else {
fprintf(stderr, "Invalid affected_rows key in json\n");
}
cJSON *fields_count = cJSON_GetObjectItem(query, "fields_count");
cJSON *precisionObj = cJSON_GetObjectItem(query, "precision");
cJSON *id = cJSON_GetObjectItem(query, "id");
if (!cJSON_IsBool(is_update) ||
!cJSON_IsNumber(fields_count) ||
!cJSON_IsNumber(precisionObj) ||
!cJSON_IsNumber(id)) {
fprintf(stderr, "Invalid or miss 'is_update'/'fields_count'/'precision'/'id' in query response\n");
cJSON_Delete(query);
return;
}
if (is_update->valueint) {
cJSON *affected_rows = cJSON_GetObjectItem(query, "affected_rows");
if (cJSON_IsNumber(affected_rows)) {
et = taosGetTimestampUs();
printf("Update OK, %d row(s) in set (%.6fs)\n\n", (int)affected_rows->valueint, (et - st) / 1E6);
} else {
int cols = 0;
int precision = 0;
int64_t total_rows = 0;
int showed_rows = 0;
TAOS_FIELD *fields = wsclient_print_header(query, &cols, &precision);
if (fields != NULL) {
cJSON *id = cJSON_GetObjectItem(query, "id");
if (cJSON_IsNumber(id)) {
bool completed = false;
while (!completed) {
if (wsclient_send_sql(NULL, WS_FETCH, (int)id->valueint) == 0) {
cJSON *fetch = wsclient_parse_response();
if (fetch != NULL) {
if (wsclient_check(fetch, st, et) == 0) {
cJSON *_completed = cJSON_GetObjectItem(fetch, "completed");
if (cJSON_IsBool(_completed)) {
if (_completed->valueint) {
completed = true;
continue;
}
cJSON *rows = cJSON_GetObjectItem(fetch, "rows");
if (cJSON_IsNumber(rows)) {
total_rows += rows->valueint;
cJSON *lengths = cJSON_GetObjectItem(fetch, "lengths");
if (cJSON_IsArray(lengths)) {
for (int i = 0; i < cols; i++) {
fields[i].bytes = (int16_t)(cJSON_GetArrayItem(lengths, i)->valueint);
}
if (showed_rows < DEFAULT_RES_SHOW_NUM) {
if (wsclient_send_sql(NULL, WS_FETCH_BLOCK, (int)id->valueint) == 0) {
wsclient_print_data((int)rows->valueint, fields, cols, id->valueint, precision, &showed_rows);
}
}
continue;
} else {
fprintf(stderr, "Invalid lengths key in json\n");
}
} else {
fprintf(stderr, "Invalid rows key in json\n");
}
} else {
fprintf(stderr, "Invalid completed key in json\n");
}
}
}
}
fprintf(stderr, "err occured in fetch/fetch_block ws actions\n");
break;
}
if (showed_rows == DEFAULT_RES_SHOW_NUM) {
printf("\n");
printf(" Notice: The result shows only the first %d rows.\n", DEFAULT_RES_SHOW_NUM);
printf("\n");
}
printf("Query OK, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6);
} else {
fprintf(stderr, "Invalid id key in json\n");
}
free(fields);
fprintf(stderr, "Invalid or miss 'affected_rows' key in response\n");
}
cJSON_Delete(query);
return;
}
ws_id = id->valueint;
int cols = (int)fields_count->valueint;
int precision = (int)precisionObj->valueint;
int64_t total_rows = 0;
int showed_rows = 0;
bool completed = false;
TAOS_FIELD fields[TSDB_MAX_COLUMNS];
if (wsclient_fetch_fields(query, fields, cols)) {
cJSON_Delete(query);
return;
}
int width[TSDB_MAX_COLUMNS];
for (int i = 0; i < cols; ++i) {
width[i] = calcColWidth(fields + i, precision);
}
printHeader(fields, width, cols);
cJSON_Delete(query);
while (!completed && !stop_fetch) {
if (wsclient_send_sql(NULL, WS_FETCH, ws_id)) {
return;
}
char *fetch_buffer = wsclient_get_response();
if (fetch_buffer == NULL) {
return;
}
cJSON *fetch = cJSON_Parse(fetch_buffer);
if (fetch == NULL) {
fprintf(stderr, "failed to parse response into json: %s\n", fetch_buffer);
free(fetch_buffer);
return;
}
free(fetch_buffer);
if (wsclient_check(fetch, st, et)) {
cJSON_Delete(fetch);
return;
}
cJSON *completedObj = cJSON_GetObjectItem(fetch, "completed");
cJSON *rows = cJSON_GetObjectItem(fetch, "rows");
cJSON *lengths = cJSON_GetObjectItem(fetch, "lengths");
if (!cJSON_IsBool(completedObj) || !cJSON_IsNumber(rows)) {
fprintf(stderr, "Invalid or miss 'completed'/'rows' in fetch response\n");
cJSON_Delete(fetch);
return;
}
if (completedObj->valueint) {
cJSON_Delete(fetch);
completed = true;
continue;
}
total_rows += rows->valueint;
if (!cJSON_IsArray(lengths)) {
fprintf(stderr, "Invalid or miss 'lengths' in fetch response\n");
cJSON_Delete(fetch);
return;
}
for (int i = 0; i < cols; i++) {
cJSON* length = cJSON_GetArrayItem(lengths, i);
if (!cJSON_IsNumber(length)) {
fprintf(stderr, "Invalid or miss 'lengths' key in fetch response\n");
cJSON_Delete(fetch);
return;
}
fields[i].bytes = (int16_t)(length->valueint);
}
if (showed_rows < DEFAULT_RES_SHOW_NUM) {
if (wsclient_send_sql(NULL, WS_FETCH_BLOCK, ws_id)) {
cJSON_Delete(fetch);
return;
}
if (wsclient_print_data((int)rows->valueint, fields, cols, ws_id, precision, &showed_rows)) {
cJSON_Delete(fetch);
return;
}
cJSON_Delete(fetch);
continue;
}
}
et = taosGetTimestampUs();
if (stop_fetch) {
printf("Query interrupted, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6);
stop_fetch = false;
} else {
fprintf(stderr, "Invalid is_update key in json\n");
printf("Query OK, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6);
}
cJSON_Delete(query);
return;
}
\ No newline at end of file
......@@ -20,6 +20,7 @@
#include "shellCommand.h"
#include "tkey.h"
#include "tulog.h"
#include "shellAuto.h"
#define OPT_ABORT 1 /* �Cabort */
......@@ -283,7 +284,12 @@ int32_t shellReadCommand(TAOS *con, char *command) {
utf8_array[k] = c;
}
insertChar(&cmd, utf8_array, count);
pressOtherKey(c);
} else if (c == TAB_KEY) {
// press TAB key
pressTabKey(con, &cmd);
} else if (c < '\033') {
pressOtherKey(c);
// Ctrl keys. TODO: Implement ctrl combinations
switch (c) {
case 1: // ctrl A
......@@ -329,8 +335,12 @@ int32_t shellReadCommand(TAOS *con, char *command) {
case 21: // Ctrl + U;
clearLineBefore(&cmd);
break;
case 23: // Ctrl + W;
positionCursorMiddle(&cmd);
break;
}
} else if (c == '\033') {
pressOtherKey(c);
c = (char)getchar();
switch (c) {
case '[':
......@@ -405,9 +415,11 @@ int32_t shellReadCommand(TAOS *con, char *command) {
break;
}
} else if (c == 0x7f) {
pressOtherKey(c);
// press delete key
backspaceChar(&cmd);
} else {
pressOtherKey(c);
insertChar(&cmd, &c, 1);
}
}
......@@ -556,14 +568,14 @@ void showOnScreen(Command *cmd) {
/* assert(size >= 0); */
int width = wcwidth(wc);
if (remain_column > width) {
printf("%lc", wc);
fprintf(stdout, "%lc", wc);
remain_column -= width;
} else {
if (remain_column == width) {
printf("%lc\n\r", wc);
fprintf(stdout, "%lc\n\r", wc);
remain_column = w.ws_col;
} else {
printf("\n\r%lc", wc);
fprintf(stdout, "\n\r%lc", wc);
remain_column = w.ws_col - width;
}
}
......
......@@ -17,14 +17,20 @@
#include "shell.h"
#include "tconfig.h"
#include "tnettest.h"
#include "shellCommand.h"
#include "shellAuto.h"
pthread_t pid;
static tsem_t cancelSem;
bool stop_fetch = false;
int64_t ws_id = 0;
void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) {
tsem_post(&cancelSem);
}
void shellRestfulSendInterruptHandler(int32_t signum, void *sigInfo, void *context) {}
void *cancelHandler(void *arg) {
setThreadName("cancelHandler");
......@@ -33,7 +39,12 @@ void *cancelHandler(void *arg) {
taosMsleep(10);
continue;
}
if (args.restful || args.cloud) {
stop_fetch = true;
if (wsclient_send_sql(NULL, WS_CLOSE, ws_id)) {
exit(EXIT_FAILURE);
}
}
#ifdef LINUX
int64_t rid = atomic_val_compare_exchange_64(&result, result, 0);
SSqlObj* pSql = taosAcquireRef(tscObjRef, rid);
......@@ -87,6 +98,7 @@ SShellArguments args = {.host = NULL,
.pktNum = 100,
.pktType = "TCP",
.netTestRole = NULL,
.cloudDsn = NULL,
.cloud = true,
.cloudHost = NULL,
.cloudPort = NULL,
......@@ -159,13 +171,21 @@ int main(int argc, char* argv[]) {
taosSetSignal(SIGINT, shellQueryInterruptHandler);
taosSetSignal(SIGHUP, shellQueryInterruptHandler);
taosSetSignal(SIGABRT, shellQueryInterruptHandler);
if (args.restful || args.cloud) {
#ifdef LINUX
taosSetSignal(SIGPIPE, shellRestfulSendInterruptHandler);
#endif
}
/* Get grant information */
shellGetGrantInfo(args.con);
shellAutoInit();
/* Loop to query the input. */
while (1) {
pthread_create(&pid, NULL, shellLoopQuery, args.con);
pthread_join(pid, NULL);
}
shellAutoExit();
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define __USE_XOPEN
#include "os.h"
#include "tire.h"
// ----------- interface -------------
// create prefix search tree
STire* createTire(char type) {
STire* tire = malloc(sizeof(STire));
memset(tire, 0, sizeof(STire));
tire->ref = 1; // init is 1
tire->type = type;
tire->root.d = (STireNode **)calloc(CHAR_CNT, sizeof(STireNode *));
return tire;
}
// free tire node
void freeTireNode(STireNode* node) {
if (node == NULL)
return ;
// nest free sub node on array d
if(node->d) {
for (int i = 0; i < CHAR_CNT; i++) {
freeTireNode(node->d[i]);
}
tfree(node->d);
}
// free self
tfree(node);
}
// destroy prefix search tree
void freeTire(STire* tire) {
// free nodes
for (int i = 0; i < CHAR_CNT; i++) {
freeTireNode(tire->root.d[i]);
}
tfree(tire->root.d);
// free from list
StrName * item = tire->head;
while (item) {
// free string
tfree(item->name);
// free node
tfree(item);
// move next
item = item->next;
}
tire->head = tire->tail = NULL;
// free tire
tfree(tire);
}
// insert a new word to list
bool insertToList(STire* tire, char* word) {
StrName * p = (StrName *)malloc(sizeof(StrName));
p->name = strdup(word);
p->next = NULL;
if(tire->head == NULL) {
tire->head = p;
tire->tail = p;
}else {
tire->tail->next = p;
tire->tail = p;
}
return true;
}
// insert a new word to tree
bool insertToTree(STire* tire, char* word, int len) {
int m = 0;
STireNode ** nodes = tire->root.d;
for (int i = 0; i < len; i++) {
m = word[i] - FIRST_ASCII;
if (m < 0 || m > CHAR_CNT) {
return false;
}
if (nodes[m] == NULL) {
// no pointer
STireNode* p = (STireNode* )tmalloc(sizeof(STireNode));
memset(p, 0, sizeof(STireNode));
nodes[m] = p;
if (i == len - 1) {
// is end
p->end = true;
break;
}
}
if (nodes[m]->d == NULL) {
// malloc d
nodes[m]->d = (STireNode **)calloc(CHAR_CNT, sizeof(STireNode *));
}
// move to next node
nodes = nodes[m]->d;
}
// add count
tire->count += 1;
return true;
}
// insert a new word
bool insertWord(STire* tire, char* word) {
int len = strlen(word);
if (len >= MAX_WORD_LEN) {
return false;
}
switch (tire->type) {
case TIRE_TREE:
return insertToTree(tire, word, len);
case TIRE_LIST:
return insertToList(tire, word);
default:
break;
}
return false;
}
// delete one word from list
bool deleteFromList(STire* tire, char* word) {
StrName * item = tire->head;
while (item) {
if (strcmp(item->name, word) == 0) {
// found, reset empty to delete
item->name[0] = 0;
}
// move next
item = item->next;
}
return true;
}
// delete one word from tree
bool deleteFromTree(STire* tire, char* word, int len) {
int m = 0;
bool del = false;
STireNode** nodes = tire->root.d;
for (int i = 0; i < len; i++) {
m = word[i] - FIRST_ASCII;
if (m < 0 || m >= CHAR_CNT) {
return false;
}
if (nodes[m] == NULL) {
// no found
return false;
} else {
// not null
if(i == len - 1) {
// this is last, only set end false , not free node
nodes[m]->end = false;
del = true;
break;
}
}
if(nodes[m]->d == NULL)
break;
// move to next node
nodes = nodes[m]->d;
}
// reduce count
if (del) {
tire->count -= 1;
}
return del;
}
// insert a new word
bool deleteWord(STire* tire, char* word) {
int len = strlen(word);
if (len >= MAX_WORD_LEN) {
return false;
}
switch (tire->type) {
case TIRE_TREE:
return deleteFromTree(tire, word, len);
case TIRE_LIST:
return deleteFromList(tire, word);
default:
break;
}
return false;
}
void addWordToMatch(SMatch* match, char* word){
// malloc new
SMatchNode* node = (SMatchNode* )tmalloc(sizeof(SMatchNode));
memset(node, 0, sizeof(SMatchNode));
node->word = strdup(word);
// append to match
if (match->head == NULL) {
match->head = match->tail = node;
} else {
match->tail->next = node;
match->tail = node;
}
match->count += 1;
}
// enum all words from node
void enumAllWords(STireNode** nodes, char* prefix, SMatch* match) {
STireNode * c;
char word[MAX_WORD_LEN];
int len = strlen(prefix);
for (int i = 0; i < CHAR_CNT; i++) {
c = nodes[i];
if (c == NULL) {
// chain end node
continue;
} else {
// combine word string
memset(word, 0, sizeof(word));
strcpy(word, prefix);
word[len] = FIRST_ASCII + i; // append current char
// chain middle node
if (c->end) {
// have end flag
addWordToMatch(match, word);
}
// nested call next layer
if (c->d)
enumAllWords(c->d, word, match);
}
}
}
// match prefix from list
void matchPrefixFromList(STire* tire, char* prefix, SMatch* match) {
StrName * item = tire->head;
int len = strlen(prefix);
while (item) {
if ( strncmp(item->name, prefix, len) == 0) {
// prefix matched
addWordToMatch(match, item->name);
}
// move next
item = item->next;
}
}
// match prefix words, if match is not NULL , put all item to match and return match
void matchPrefixFromTree(STire* tire, char* prefix, SMatch* match) {
SMatch* root = match;
int m = 0;
STireNode* c = 0;
int len = strlen(prefix);
if (len >= MAX_WORD_LEN) {
return;
}
STireNode** nodes = tire->root.d;
for (int i = 0; i < len; i++) {
m = prefix[i] - FIRST_ASCII;
if (m < 0 || m > CHAR_CNT) {
return;
}
// match
c = nodes[m];
if (c == NULL) {
// arrive end
break;
}
// previous items already matched
if (i == len - 1) {
// malloc match if not pass by param match
if (root == NULL) {
root = (SMatch* )tmalloc(sizeof(SMatch));
memset(root, 0, sizeof(SMatch));
strcpy(root->pre, prefix);
}
// prefix is match to end char
if (c->d)
enumAllWords(c->d, prefix, root);
} else {
// move to next node continue match
if(c->d == NULL)
break;
nodes = c->d;
}
}
// return
return ;
}
SMatch* matchPrefix(STire* tire, char* prefix, SMatch* match) {
if(match == NULL) {
match = (SMatch* )tmalloc(sizeof(SMatch));
memset(match, 0, sizeof(SMatch));
}
switch (tire->type) {
case TIRE_TREE:
matchPrefixFromTree(tire, prefix, match);
case TIRE_LIST:
matchPrefixFromList(tire, prefix, match);
default:
break;
}
// return if need
if (match->count == 0) {
freeMatch(match);
match = NULL;
}
return match;
}
// get all items from tires tree
void enumFromList(STire* tire, SMatch* match) {
StrName * item = tire->head;
while (item) {
if (item->name[0] != 0) {
// not delete
addWordToMatch(match, item->name);
}
// move next
item = item->next;
}
}
// get all items from tires tree
void enumFromTree(STire* tire, SMatch* match) {
char pre[2] ={0, 0};
STireNode* c;
// enum first layer
for (int i = 0; i < CHAR_CNT; i++) {
pre[0] = FIRST_ASCII + i;
// each node
c = tire->root.d[i];
if (c == NULL) {
// this branch no data
continue;
}
// this branch have data
if(c->end)
addWordToMatch(match, pre);
else
matchPrefix(tire, pre, match);
}
}
// get all items from tires tree
SMatch* enumAll(STire* tire) {
SMatch* match = (SMatch* )tmalloc(sizeof(SMatch));
memset(match, 0, sizeof(SMatch));
switch (tire->type) {
case TIRE_TREE:
enumFromTree(tire, match);
case TIRE_LIST:
enumFromList(tire, match);
default:
break;
}
// return if need
if (match->count == 0) {
freeMatch(match);
match = NULL;
}
return match;
}
// free match result
void freeMatchNode(SMatchNode* node) {
// first free next
if (node->next)
freeMatchNode(node->next);
// second free self
if (node->word)
free(node->word);
free(node);
}
// free match result
void freeMatch(SMatch* match) {
// first free next
if (match->head) {
freeMatchNode(match->head);
}
// second free self
free(match);
}
......@@ -128,7 +128,7 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
tsdbStopStream(pRepo);
if(pRepo->pthread){
taosDestoryThread(pRepo->pthread);
taosDestroyThread(pRepo->pthread);
pRepo->pthread = NULL;
}
......
......@@ -26,7 +26,7 @@ extern "C" {
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param);
// destory thread
bool taosDestoryThread(pthread_t* pthread);
bool taosDestroyThread(pthread_t* pthread);
// thread running return true
bool taosThreadRunning(pthread_t* pthread);
......
......@@ -38,7 +38,7 @@ pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) {
}
// destory thread
bool taosDestoryThread(pthread_t* pthread) {
bool taosDestroyThread(pthread_t* pthread) {
if(pthread == NULL) return false;
if(taosThreadRunning(pthread)) {
pthread_cancel(*pthread);
......
......@@ -461,7 +461,7 @@ void vnodeStopWaitingThread(SVnodeObj* pVnode) {
if(loop == 0) {
vInfo("vgId:%d :SDEL force kill thread to quit. pthread=%p pWrite=%p", pVnode->vgId, pWaitThread->pthread, pWaitThread->param);
// thread not stop , so need kill
taosDestoryThread(pWaitThread->pthread);
taosDestroyThread(pWaitThread->pthread);
// write msg need remove from queue
SVWriteMsg* pWrite = (SVWriteMsg* )pWaitThread->param;
if (pWrite)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册