提交 2ac9a78d 编写于 作者: X Xiaoyu Wang

feat: support 'select *, expr from ...' syntax

上级 dc769a28
...@@ -124,15 +124,13 @@ char* getCurrentTimeString(char* timeString) { ...@@ -124,15 +124,13 @@ char* getCurrentTimeString(char* timeString) {
return timeString; return timeString;
} }
static void tmqStop(int signum, void *info, void *ctx) { static void tmqStop(int signum, void* info, void* ctx) {
running = 0; running = 0;
char tmpString[128]; char tmpString[128];
taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum); taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum);
} }
static void tmqSetSignalHandle() { static void tmqSetSignalHandle() { taosSetSignal(SIGINT, tmqStop); }
taosSetSignal(SIGINT, tmqStop);
}
void initLogFile() { void initLogFile() {
char filename[256]; char filename[256];
...@@ -433,7 +431,7 @@ static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields ...@@ -433,7 +431,7 @@ static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields
int32_t precision) { int32_t precision) {
for (int32_t i = 0; i < num_fields; i++) { for (int32_t i = 0; i < num_fields; i++) {
if (i > 0) { if (i > 0) {
taosFprintfFile(pFile, "\n"); taosFprintfFile(pFile, ",");
} }
shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision); shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision);
} }
...@@ -463,16 +461,16 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) ...@@ -463,16 +461,16 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
int32_t precision = taos_result_precision(msg); int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg); const char* tbName = tmq_get_table_name(msg);
#if 0 #if 0
// get schema // get schema
//============================== stub =================================================// //============================== stub =================================================//
for (int32_t i = 0; i < numOfFields; i++) { for (int32_t i = 0; i < numOfFields; i++) {
taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes); taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
} }
//============================== stub =================================================// //============================== stub =================================================//
#endif #endif
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision); dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
...@@ -523,15 +521,15 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { ...@@ -523,15 +521,15 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
static int32_t g_once_commit_flag = 0; static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
pError("tmq_commit_cb_print() commit %d\n", code); pError("tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) { if (0 == g_once_commit_flag) {
g_once_commit_flag = 1; g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
} }
char tmpString[128]; char tmpString[128];
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
} }
void build_consumer(SThreadInfo* pInfo) { void build_consumer(SThreadInfo* pInfo) {
...@@ -683,13 +681,13 @@ void* consumeThreadFunc(void* param) { ...@@ -683,13 +681,13 @@ void* consumeThreadFunc(void* param) {
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pInfo->taos == NULL) { if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n"); taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
return NULL; return NULL;
} }
build_consumer(pInfo); build_consumer(pInfo);
build_topic_list(pInfo); build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n"); taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
assert(0); assert(0);
return NULL; return NULL;
} }
...@@ -697,7 +695,7 @@ void* consumeThreadFunc(void* param) { ...@@ -697,7 +695,7 @@ void* consumeThreadFunc(void* param) {
int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err != 0) { if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err)); taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
assert(0); assert(0);
return NULL; return NULL;
} }
...@@ -718,13 +716,13 @@ void* consumeThreadFunc(void* param) { ...@@ -718,13 +716,13 @@ void* consumeThreadFunc(void* param) {
err = tmq_unsubscribe(pInfo->tmq); err = tmq_unsubscribe(pInfo->tmq);
if (err != 0) { if (err != 0) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err)); taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
} }
err = tmq_consumer_close(pInfo->tmq); err = tmq_consumer_close(pInfo->tmq);
if (err != 0) { if (err != 0) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err)); taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
} }
pInfo->tmq = NULL; pInfo->tmq = NULL;
...@@ -871,4 +869,3 @@ int main(int32_t argc, char* argv[]) { ...@@ -871,4 +869,3 @@ int main(int32_t argc, char* argv[]) {
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册