提交 de59ed95 编写于 作者: Z zhaoyanggh

format prettier

上级 13b93963
此差异已折叠。
...@@ -20,9 +20,9 @@ ...@@ -20,9 +20,9 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
#include <string.h>
#include <taos.h> #include <taos.h>
...@@ -33,14 +33,14 @@ int tablesSelectProcessed = 0; ...@@ -33,14 +33,14 @@ int tablesSelectProcessed = 0;
int64_t st, et; int64_t st, et;
typedef struct { typedef struct {
int id; int id;
TAOS *taos; TAOS * taos;
char name[16]; char name[16];
time_t timeStamp; time_t timeStamp;
int value; int value;
int rowsInserted; int rowsInserted;
int rowsTried; int rowsTried;
int rowsRetrieved; int rowsRetrieved;
} STable; } STable;
void taos_insert_call_back(void *param, TAOS_RES *tres, int code); void taos_insert_call_back(void *param, TAOS_RES *tres, int code);
...@@ -48,7 +48,7 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code); ...@@ -48,7 +48,7 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code);
void taos_error(TAOS *taos); void taos_error(TAOS *taos);
static void queryDB(TAOS *taos, char *command) { static void queryDB(TAOS *taos, char *command) {
int i; int i;
TAOS_RES *pSql = NULL; TAOS_RES *pSql = NULL;
int32_t code = -1; int32_t code = -1;
...@@ -57,12 +57,12 @@ static void queryDB(TAOS *taos, char *command) { ...@@ -57,12 +57,12 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql); taos_free_result(pSql);
pSql = NULL; pSql = NULL;
} }
pSql = taos_query(taos, command); pSql = taos_query(taos, command);
code = taos_errno(pSql); code = taos_errno(pSql);
if (0 == code) { if (0 == code) {
break; break;
} }
} }
if (code != 0) { if (code != 0) {
...@@ -76,15 +76,14 @@ static void queryDB(TAOS *taos, char *command) { ...@@ -76,15 +76,14 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql); taos_free_result(pSql);
} }
int main(int argc, char *argv[]) int main(int argc, char *argv[]) {
{ TAOS * taos;
TAOS *taos; struct timeval systemTime;
struct timeval systemTime; int i;
int i; char sql[1024] = {0};
char sql[1024] = { 0 }; char prefix[20] = {0};
char prefix[20] = { 0 }; char db[128] = {0};
char db[128] = { 0 }; STable * tableList;
STable *tableList;
if (argc != 5) { if (argc != 5) {
printf("usage: %s server-ip dbname rowsPerTable numOfTables\n", argv[0]); printf("usage: %s server-ip dbname rowsPerTable numOfTables\n", argv[0]);
...@@ -101,8 +100,7 @@ int main(int argc, char *argv[]) ...@@ -101,8 +100,7 @@ int main(int argc, char *argv[])
memset(tableList, 0, size); memset(tableList, 0, size);
taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) if (taos == NULL) taos_error(taos);
taos_error(taos);
printf("success to connect to server\n"); printf("success to connect to server\n");
...@@ -122,7 +120,7 @@ int main(int argc, char *argv[]) ...@@ -122,7 +120,7 @@ int main(int argc, char *argv[])
sprintf(tableList[i].name, "%s%d", prefix, i); sprintf(tableList[i].name, "%s%d", prefix, i);
sprintf(sql, "create table %s%d (ts timestamp, volume bigint)", prefix, i); sprintf(sql, "create table %s%d (ts timestamp, volume bigint)", prefix, i);
queryDB(taos, sql); queryDB(taos, sql);
} }
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
for (i = 0; i < numOfTables; ++i) for (i = 0; i < numOfTables; ++i)
...@@ -138,7 +136,7 @@ int main(int argc, char *argv[]) ...@@ -138,7 +136,7 @@ int main(int argc, char *argv[])
tablesInsertProcessed = 0; tablesInsertProcessed = 0;
tablesSelectProcessed = 0; tablesSelectProcessed = 0;
for (i = 0; i<numOfTables; ++i) { for (i = 0; i < numOfTables; ++i) {
// insert records in asynchronous API // insert records in asynchronous API
sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i); sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
taos_query_a(taos, sql, taos_insert_call_back, (void *)(tableList + i)); taos_query_a(taos, sql, taos_insert_call_back, (void *)(tableList + i));
...@@ -147,12 +145,12 @@ int main(int argc, char *argv[]) ...@@ -147,12 +145,12 @@ int main(int argc, char *argv[])
printf("once insert finished, presse any key to query\n"); printf("once insert finished, presse any key to query\n");
getchar(); getchar();
while(1) { while (1) {
if (tablesInsertProcessed < numOfTables) { if (tablesInsertProcessed < numOfTables) {
printf("wait for process finished\n"); printf("wait for process finished\n");
sleep(1); sleep(1);
continue; continue;
} }
break; break;
} }
...@@ -161,9 +159,8 @@ int main(int argc, char *argv[]) ...@@ -161,9 +159,8 @@ int main(int argc, char *argv[])
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
for (i = 0; i < numOfTables; ++i) { for (i = 0; i < numOfTables; ++i) {
// select records in asynchronous API // select records in asynchronous API
sprintf(sql, "select * from %s", tableList[i].name); sprintf(sql, "select * from %s", tableList[i].name);
taos_query_a(taos, sql, taos_select_call_back, (void *)(tableList + i)); taos_query_a(taos, sql, taos_select_call_back, (void *)(tableList + i));
} }
...@@ -171,17 +168,17 @@ int main(int argc, char *argv[]) ...@@ -171,17 +168,17 @@ int main(int argc, char *argv[])
printf("\nonce finished, press any key to exit\n"); printf("\nonce finished, press any key to exit\n");
getchar(); getchar();
while(1) { while (1) {
if (tablesSelectProcessed < numOfTables) { if (tablesSelectProcessed < numOfTables) {
printf("wait for process finished\n"); printf("wait for process finished\n");
sleep(1); sleep(1);
continue; continue;
} }
break; break;
} }
for (i = 0; i<numOfTables; ++i) { for (i = 0; i < numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved); printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
} }
...@@ -193,60 +190,54 @@ int main(int argc, char *argv[]) ...@@ -193,60 +190,54 @@ int main(int argc, char *argv[])
return 0; return 0;
} }
void taos_error(TAOS *con) void taos_error(TAOS *con) {
{
fprintf(stderr, "TDengine error: %s\n", taos_errstr(con)); fprintf(stderr, "TDengine error: %s\n", taos_errstr(con));
taos_close(con); taos_close(con);
taos_cleanup(); taos_cleanup();
exit(1); exit(1);
} }
void taos_insert_call_back(void *param, TAOS_RES *tres, int code) void taos_insert_call_back(void *param, TAOS_RES *tres, int code) {
{ STable * pTable = (STable *)param;
STable *pTable = (STable *)param; struct timeval systemTime;
struct timeval systemTime; char sql[128];
char sql[128];
pTable->rowsTried++; pTable->rowsTried++;
if (code < 0) { if (code < 0) {
printf("%s insert failed, code:%d, rows:%d\n", pTable->name, code, pTable->rowsTried); printf("%s insert failed, code:%d, rows:%d\n", pTable->name, code, pTable->rowsTried);
} } else if (code == 0) {
else if (code == 0) {
printf("%s not inserted\n", pTable->name); printf("%s not inserted\n", pTable->name);
} } else {
else {
pTable->rowsInserted++; pTable->rowsInserted++;
} }
if (pTable->rowsTried < points) { if (pTable->rowsTried < points) {
// for this demo, insert another record // for this demo, insert another record
sprintf(sql, "insert into %s values(%ld, %d)", pTable->name, 1546300800000+pTable->rowsTried*1000, pTable->rowsTried); sprintf(sql, "insert into %s values(%ld, %d)", pTable->name, 1546300800000 + pTable->rowsTried * 1000,
pTable->rowsTried);
taos_query_a(pTable->taos, sql, taos_insert_call_back, (void *)pTable); taos_query_a(pTable->taos, sql, taos_insert_call_back, (void *)pTable);
} } else {
else {
printf("%d rows data are inserted into %s\n", points, pTable->name); printf("%d rows data are inserted into %s\n", points, pTable->name);
tablesInsertProcessed++; tablesInsertProcessed++;
if (tablesInsertProcessed >= numOfTables) { if (tablesInsertProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables); printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points * numOfTables);
} }
} }
taos_free_result(tres); taos_free_result(tres);
} }
void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) {
{ STable * pTable = (STable *)param;
STable *pTable = (STable *)param;
struct timeval systemTime; struct timeval systemTime;
if (numOfRows > 0) { if (numOfRows > 0) {
for (int i = 0; i < numOfRows; ++i) {
for (int i = 0; i<numOfRows; ++i) {
// synchronous API to retrieve a row from batch of records // synchronous API to retrieve a row from batch of records
/*TAOS_ROW row = */(void)taos_fetch_row(tres); /*TAOS_ROW row = */ (void)taos_fetch_row(tres);
// process row // process row
} }
...@@ -255,12 +246,10 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) ...@@ -255,12 +246,10 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
// retrieve next batch of rows // retrieve next batch of rows
taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable); taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
} } else {
else { if (numOfRows < 0) printf("%s retrieve failed, code:%d\n", pTable->name, numOfRows);
if (numOfRows < 0)
printf("%s retrieve failed, code:%d\n", pTable->name, numOfRows);
//taos_free_result(tres); // taos_free_result(tres);
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name); printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
tablesSelectProcessed++; tablesSelectProcessed++;
...@@ -272,19 +261,15 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) ...@@ -272,19 +261,15 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
taos_free_result(tres); taos_free_result(tres);
} }
} }
void taos_select_call_back(void *param, TAOS_RES *tres, int code) void taos_select_call_back(void *param, TAOS_RES *tres, int code) {
{
STable *pTable = (STable *)param; STable *pTable = (STable *)param;
if (code == 0 && tres) { if (code == 0 && tres) {
// asynchronous API to fetch a batch of records // asynchronous API to fetch a batch of records
taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable); taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
} } else {
else {
printf("%s select failed, code:%d\n", pTable->name, code); printf("%s select failed, code:%d\n", pTable->name, code);
taos_free_result(tres); taos_free_result(tres);
taos_cleanup(); taos_cleanup();
......
...@@ -16,14 +16,14 @@ ...@@ -16,14 +16,14 @@
// TAOS standard API example. The same syntax as MySQL, but only a subset // TAOS standard API example. The same syntax as MySQL, but only a subset
// to compile: gcc -o demo demo.c -ltaos // to compile: gcc -o demo demo.c -ltaos
#include <inttypes.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <inttypes.h>
#include <taos.h> // TAOS header file #include <taos.h> // TAOS header file
static void queryDB(TAOS *taos, char *command) { static void queryDB(TAOS *taos, char *command) {
int i; int i;
TAOS_RES *pSql = NULL; TAOS_RES *pSql = NULL;
int32_t code = -1; int32_t code = -1;
...@@ -32,12 +32,12 @@ static void queryDB(TAOS *taos, char *command) { ...@@ -32,12 +32,12 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql); taos_free_result(pSql);
pSql = NULL; pSql = NULL;
} }
pSql = taos_query(taos, command); pSql = taos_query(taos, command);
code = taos_errno(pSql); code = taos_errno(pSql);
if (0 == code) { if (0 == code) {
break; break;
} }
} }
if (code != 0) { if (code != 0) {
...@@ -53,7 +53,7 @@ static void queryDB(TAOS *taos, char *command) { ...@@ -53,7 +53,7 @@ static void queryDB(TAOS *taos, char *command) {
void Test(TAOS *taos, char *qstr, int i); void Test(TAOS *taos, char *qstr, int i);
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
char qstr[1024]; char qstr[1024];
// connect to server // connect to server
if (argc < 2) { if (argc < 2) {
...@@ -63,7 +63,7 @@ int main(int argc, char *argv[]) { ...@@ -63,7 +63,7 @@ int main(int argc, char *argv[]) {
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/); printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
exit(1); exit(1);
} }
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
...@@ -72,28 +72,30 @@ int main(int argc, char *argv[]) { ...@@ -72,28 +72,30 @@ int main(int argc, char *argv[]) {
taos_close(taos); taos_close(taos);
taos_cleanup(); taos_cleanup();
} }
void Test(TAOS *taos, char *qstr, int index) { void Test(TAOS *taos, char *qstr, int index) {
printf("==================test at %d\n================================", index); printf("==================test at %d\n================================", index);
queryDB(taos, "drop database if exists demo"); queryDB(taos, "drop database if exists demo");
queryDB(taos, "create database demo"); queryDB(taos, "create database demo");
TAOS_RES *result; TAOS_RES *result;
queryDB(taos, "use demo"); queryDB(taos, "use demo");
queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))"); queryDB(taos,
"create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))");
printf("success to create table\n"); printf("success to create table\n");
int i = 0; int i = 0;
for (i = 0; i < 10; ++i) { for (i = 0; i < 10; ++i) {
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello"); sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')",
(uint64_t)(1546300800000 + i * 1000), i, i, i, i * 10000000, i * 1.0, i * 2.0, "hello");
printf("qstr: %s\n", qstr); printf("qstr: %s\n", qstr);
// note: how do you wanna do if taos_query returns non-NULL // note: how do you wanna do if taos_query returns non-NULL
// if (taos_query(taos, qstr)) { // if (taos_query(taos, qstr)) {
// printf("insert row: %i, reason:%s\n", i, taos_errstr(taos)); // printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
// } // }
TAOS_RES *result1 = taos_query(taos, qstr); TAOS_RES *result1 = taos_query(taos, qstr);
if (result1 == NULL || taos_errno(result1) != 0) { if (result1 == NULL || taos_errno(result1) != 0) {
printf("failed to insert row, reason:%s\n", taos_errstr(result1)); printf("failed to insert row, reason:%s\n", taos_errstr(result1));
taos_free_result(result1); taos_free_result(result1);
exit(1); exit(1);
} else { } else {
...@@ -107,7 +109,7 @@ void Test(TAOS *taos, char *qstr, int index) { ...@@ -107,7 +109,7 @@ void Test(TAOS *taos, char *qstr, int index) {
sprintf(qstr, "SELECT * FROM m1"); sprintf(qstr, "SELECT * FROM m1");
result = taos_query(taos, qstr); result = taos_query(taos, qstr);
if (result == NULL || taos_errno(result) != 0) { if (result == NULL || taos_errno(result) != 0) {
printf("failed to select, reason:%s\n", taos_errstr(result)); printf("failed to select, reason:%s\n", taos_errstr(result));
taos_free_result(result); taos_free_result(result);
exit(1); exit(1);
} }
...@@ -130,4 +132,3 @@ void Test(TAOS *taos, char *qstr, int index) { ...@@ -130,4 +132,3 @@ void Test(TAOS *taos, char *qstr, int index) {
taos_free_result(result); taos_free_result(result);
printf("====demo end====\n\n"); printf("====demo end====\n\n");
} }
...@@ -21,103 +21,101 @@ ...@@ -21,103 +21,101 @@
#ifdef __APPLE__ #ifdef __APPLE__
#include "osEok.h" #include "osEok.h"
#else // __APPLE__ #else // __APPLE__
#include <sys/epoll.h> #include <sys/epoll.h>
#endif // __APPLE__ #endif // __APPLE__
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h> #include <errno.h>
#include <string.h> #include <fcntl.h>
#include <arpa/inet.h>
#include <libgen.h> #include <libgen.h>
#include <locale.h> #include <locale.h>
#include <netdb.h> #include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__) #define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
#define A(statement, fmt, ...) do { \ #define A(statement, fmt, ...) \
if (statement) break; \ do { \
fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", \ if (statement) break; \
basename(__FILE__), __LINE__, __func__, \ fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", basename(__FILE__), __LINE__, __func__, \
#statement, errno, strerror(errno), \ #statement, errno, strerror(errno), ##__VA_ARGS__); \
##__VA_ARGS__); \ abort(); \
abort(); \ } while (0)
} while (0)
#define E(fmt, ...) do { \ #define E(fmt, ...) \
fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", \ do { \
basename(__FILE__), __LINE__, __func__, \ fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", basename(__FILE__), __LINE__, __func__, errno, strerror(errno), \
errno, strerror(errno), \ ##__VA_ARGS__); \
##__VA_ARGS__); \ } while (0)
} while (0)
#include "os.h" #include "os.h"
typedef struct ep_s ep_t; typedef struct ep_s ep_t;
struct ep_s { struct ep_s {
int ep; int ep;
pthread_mutex_t lock; pthread_mutex_t lock;
int sv[2]; // 0 for read, 1 for write; int sv[2]; // 0 for read, 1 for write;
pthread_t thread; pthread_t thread;
volatile unsigned int stopping:1; volatile unsigned int stopping : 1;
volatile unsigned int waiting:1; volatile unsigned int waiting : 1;
volatile unsigned int wakenup:1; volatile unsigned int wakenup : 1;
}; };
static int ep_dummy = 0; static int ep_dummy = 0;
static ep_t* ep_create(void); static ep_t *ep_create(void);
static void ep_destroy(ep_t *ep); static void ep_destroy(ep_t *ep);
static void* routine(void* arg); static void *routine(void *arg);
static int open_listen(unsigned short port); static int open_listen(unsigned short port);
typedef struct fde_s fde_t; typedef struct fde_s fde_t;
struct fde_s { struct fde_s {
int skt; int skt;
void (*on_event)(ep_t *ep, struct epoll_event *events, fde_t *client); void (*on_event)(ep_t *ep, struct epoll_event *events, fde_t *client);
}; };
static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client); static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client);
static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client); static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client);
#define usage(arg0, fmt, ...) do { \ #define usage(arg0, fmt, ...) \
if (fmt[0]) { \ do { \
fprintf(stderr, "" fmt "\n", ##__VA_ARGS__); \ if (fmt[0]) { \
} \ fprintf(stderr, "" fmt "\n", ##__VA_ARGS__); \
fprintf(stderr, "usage:\n"); \ } \
fprintf(stderr, " %s -l <port> : specify listenning port\n", arg0); \ fprintf(stderr, "usage:\n"); \
} while (0) fprintf(stderr, " %s -l <port> : specify listenning port\n", arg0); \
} while (0)
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
char *prg = basename(argv[0]); char *prg = basename(argv[0]);
if (argc==1) { if (argc == 1) {
usage(prg, ""); usage(prg, "");
return 0; return 0;
} }
ep_t* ep = ep_create(); ep_t *ep = ep_create();
A(ep, "failed"); A(ep, "failed");
for (int i=1; i<argc; ++i) { for (int i = 1; i < argc; ++i) {
const char *arg = argv[i]; const char *arg = argv[i];
if (0==strcmp(arg, "-l")) { if (0 == strcmp(arg, "-l")) {
++i; ++i;
if (i>=argc) { if (i >= argc) {
usage(prg, "expecting <port> after -l, but got nothing"); usage(prg, "expecting <port> after -l, but got nothing");
return 1; // confirmed potential leakage return 1; // confirmed potential leakage
} }
arg = argv[i]; arg = argv[i];
int port = atoi(arg); int port = atoi(arg);
int skt = open_listen(port); int skt = open_listen(port);
if (skt==-1) continue; if (skt == -1) continue;
fde_t *client = (fde_t*)calloc(1, sizeof(*client)); fde_t *client = (fde_t *)calloc(1, sizeof(*client));
if (!client) { if (!client) {
E("out of memory"); E("out of memory");
close(skt); close(skt);
...@@ -126,32 +124,32 @@ int main(int argc, char *argv[]) { ...@@ -126,32 +124,32 @@ int main(int argc, char *argv[]) {
client->skt = skt; client->skt = skt;
client->on_event = listen_event; client->on_event = listen_event;
struct epoll_event ev = {0}; struct epoll_event ev = {0};
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.data.ptr = client; ev.data.ptr = client;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); A(0 == epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), "");
continue; continue;
} }
usage(prg, "unknown argument: [%s]", arg); usage(prg, "unknown argument: [%s]", arg);
return 1; return 1;
} }
char *line = NULL; char * line = NULL;
size_t linecap = 0; size_t linecap = 0;
ssize_t linelen; ssize_t linelen;
while ((linelen = getline(&line, &linecap, stdin)) > 0) { while ((linelen = getline(&line, &linecap, stdin)) > 0) {
line[strlen(line)-1] = '\0'; line[strlen(line) - 1] = '\0';
if (0==strcmp(line, "exit")) break; if (0 == strcmp(line, "exit")) break;
if (0==strcmp(line, "quit")) break; if (0 == strcmp(line, "quit")) break;
if (line==strstr(line, "close")) { if (line == strstr(line, "close")) {
int fd = 0; int fd = 0;
sscanf(line, "close %d", &fd); sscanf(line, "close %d", &fd);
if (fd<=2) { if (fd <= 2) {
fprintf(stderr, "fd [%d] invalid\n", fd); fprintf(stderr, "fd [%d] invalid\n", fd);
continue; continue;
} }
A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, fd, NULL), ""); A(0 == epoll_ctl(ep->ep, EPOLL_CTL_DEL, fd, NULL), "");
continue; continue;
} }
if (strlen(line)==0) continue; if (strlen(line) == 0) continue;
fprintf(stderr, "unknown cmd:[%s]\n", line); fprintf(stderr, "unknown cmd:[%s]\n", line);
} }
ep_destroy(ep); ep_destroy(ep);
...@@ -159,69 +157,69 @@ int main(int argc, char *argv[]) { ...@@ -159,69 +157,69 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }
ep_t* ep_create(void) { ep_t *ep_create(void) {
ep_t *ep = (ep_t*)calloc(1, sizeof(*ep)); ep_t *ep = (ep_t *)calloc(1, sizeof(*ep));
A(ep, "out of memory"); A(ep, "out of memory");
A(-1!=(ep->ep = epoll_create(1)), ""); A(-1 != (ep->ep = epoll_create(1)), "");
ep->sv[0] = -1; ep->sv[0] = -1;
ep->sv[1] = -1; ep->sv[1] = -1;
A(0==socketpair(AF_LOCAL, SOCK_STREAM, 0, ep->sv), ""); A(0 == socketpair(AF_LOCAL, SOCK_STREAM, 0, ep->sv), "");
A(0==pthread_mutex_init(&ep->lock, NULL), ""); A(0 == pthread_mutex_init(&ep->lock, NULL), "");
A(0==pthread_mutex_lock(&ep->lock), ""); A(0 == pthread_mutex_lock(&ep->lock), "");
struct epoll_event ev = {0}; struct epoll_event ev = {0};
ev.events = EPOLLIN; ev.events = EPOLLIN;
ev.data.ptr = &ep_dummy; ev.data.ptr = &ep_dummy;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, ep->sv[0], &ev), ""); A(0 == epoll_ctl(ep->ep, EPOLL_CTL_ADD, ep->sv[0], &ev), "");
A(0==pthread_create(&ep->thread, NULL, routine, ep), ""); A(0 == pthread_create(&ep->thread, NULL, routine, ep), "");
A(0==pthread_mutex_unlock(&ep->lock), ""); A(0 == pthread_mutex_unlock(&ep->lock), "");
return ep; return ep;
} }
static void ep_destroy(ep_t *ep) { static void ep_destroy(ep_t *ep) {
A(ep, "invalid argument"); A(ep, "invalid argument");
ep->stopping = 1; ep->stopping = 1;
A(1==send(ep->sv[1], "1", 1, 0), ""); A(1 == send(ep->sv[1], "1", 1, 0), "");
A(0==pthread_join(ep->thread, NULL), ""); A(0 == pthread_join(ep->thread, NULL), "");
A(0==pthread_mutex_destroy(&ep->lock), ""); A(0 == pthread_mutex_destroy(&ep->lock), "");
A(0==close(ep->sv[0]), ""); A(0 == close(ep->sv[0]), "");
A(0==close(ep->sv[1]), ""); A(0 == close(ep->sv[1]), "");
A(0==close(ep->ep), ""); A(0 == close(ep->ep), "");
free(ep); free(ep);
} }
static void* routine(void* arg) { static void *routine(void *arg) {
A(arg, "invalid argument"); A(arg, "invalid argument");
ep_t *ep = (ep_t*)arg; ep_t *ep = (ep_t *)arg;
while (!ep->stopping) { while (!ep->stopping) {
struct epoll_event evs[10]; struct epoll_event evs[10];
memset(evs, 0, sizeof(evs)); memset(evs, 0, sizeof(evs));
A(0==pthread_mutex_lock(&ep->lock), ""); A(0 == pthread_mutex_lock(&ep->lock), "");
A(ep->waiting==0, "internal logic error"); A(ep->waiting == 0, "internal logic error");
ep->waiting = 1; ep->waiting = 1;
A(0==pthread_mutex_unlock(&ep->lock), ""); A(0 == pthread_mutex_unlock(&ep->lock), "");
int r = epoll_wait(ep->ep, evs, sizeof(evs)/sizeof(evs[0]), -1); int r = epoll_wait(ep->ep, evs, sizeof(evs) / sizeof(evs[0]), -1);
A(r>0, "indefinite epoll_wait shall not timeout:[%d]", r); A(r > 0, "indefinite epoll_wait shall not timeout:[%d]", r);
A(0==pthread_mutex_lock(&ep->lock), ""); A(0 == pthread_mutex_lock(&ep->lock), "");
A(ep->waiting==1, "internal logic error"); A(ep->waiting == 1, "internal logic error");
ep->waiting = 0; ep->waiting = 0;
A(0==pthread_mutex_unlock(&ep->lock), ""); A(0 == pthread_mutex_unlock(&ep->lock), "");
for (int i=0; i<r; ++i) { for (int i = 0; i < r; ++i) {
struct epoll_event *ev = evs + i; struct epoll_event *ev = evs + i;
if (ev->data.ptr == &ep_dummy) { if (ev->data.ptr == &ep_dummy) {
char c = '\0'; char c = '\0';
A(1==recv(ep->sv[0], &c, 1, 0), "internal logic error"); A(1 == recv(ep->sv[0], &c, 1, 0), "internal logic error");
A(0==pthread_mutex_lock(&ep->lock), ""); A(0 == pthread_mutex_lock(&ep->lock), "");
ep->wakenup = 0; ep->wakenup = 0;
A(0==pthread_mutex_unlock(&ep->lock), ""); A(0 == pthread_mutex_unlock(&ep->lock), "");
continue; continue;
} }
A(ev->data.ptr, "internal logic error"); A(ev->data.ptr, "internal logic error");
fde_t *client = (fde_t*)ev->data.ptr; fde_t *client = (fde_t *)ev->data.ptr;
client->on_event(ep, ev, client); client->on_event(ep, ev, client);
continue; continue;
} }
...@@ -232,7 +230,7 @@ static void* routine(void* arg) { ...@@ -232,7 +230,7 @@ static void* routine(void* arg) {
static int open_listen(unsigned short port) { static int open_listen(unsigned short port) {
int r = 0; int r = 0;
int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (skt==-1) { if (skt == -1) {
E("socket() failed"); E("socket() failed");
return -1; return -1;
} }
...@@ -241,7 +239,7 @@ static int open_listen(unsigned short port) { ...@@ -241,7 +239,7 @@ static int open_listen(unsigned short port) {
si.sin_family = AF_INET; si.sin_family = AF_INET;
si.sin_addr.s_addr = inet_addr("0.0.0.0"); si.sin_addr.s_addr = inet_addr("0.0.0.0");
si.sin_port = htons(port); si.sin_port = htons(port);
r = bind(skt, (struct sockaddr*)&si, sizeof(si)); r = bind(skt, (struct sockaddr *)&si, sizeof(si));
if (r) { if (r) {
E("bind(%u) failed", port); E("bind(%u) failed", port);
break; break;
...@@ -257,7 +255,7 @@ static int open_listen(unsigned short port) { ...@@ -257,7 +255,7 @@ static int open_listen(unsigned short port) {
if (r) { if (r) {
E("getsockname() failed"); E("getsockname() failed");
} }
A(len==sizeof(si), "internal logic error"); A(len == sizeof(si), "internal logic error");
D("listenning at: %d", ntohs(si.sin_port)); D("listenning at: %d", ntohs(si.sin_port));
return skt; return skt;
} while (0); } while (0);
...@@ -268,10 +266,10 @@ static int open_listen(unsigned short port) { ...@@ -268,10 +266,10 @@ static int open_listen(unsigned short port) {
static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client) { static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client) {
A(ev->events & EPOLLIN, "internal logic error"); A(ev->events & EPOLLIN, "internal logic error");
struct sockaddr_in si = {0}; struct sockaddr_in si = {0};
socklen_t silen = sizeof(si); socklen_t silen = sizeof(si);
int skt = accept(client->skt, (struct sockaddr*)&si, &silen); int skt = accept(client->skt, (struct sockaddr *)&si, &silen);
A(skt!=-1, "internal logic error"); A(skt != -1, "internal logic error");
fde_t *server = (fde_t*)calloc(1, sizeof(*server)); fde_t *server = (fde_t *)calloc(1, sizeof(*server));
if (!server) { if (!server) {
close(skt); close(skt);
return; return;
...@@ -279,26 +277,25 @@ static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client) { ...@@ -279,26 +277,25 @@ static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client) {
server->skt = skt; server->skt = skt;
server->on_event = null_event; server->on_event = null_event;
struct epoll_event ee = {0}; struct epoll_event ee = {0};
ee.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; ee.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ee.data.ptr = server; ee.data.ptr = server;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ee), ""); A(0 == epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ee), "");
} }
static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client) { static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client) {
if (ev->events & EPOLLIN) { if (ev->events & EPOLLIN) {
char buf[8192]; char buf[8192];
int n = recv(client->skt, buf, sizeof(buf), 0); int n = recv(client->skt, buf, sizeof(buf), 0);
A(n>=0 && n<=sizeof(buf), "internal logic error:[%d]", n); A(n >= 0 && n <= sizeof(buf), "internal logic error:[%d]", n);
A(n==fwrite(buf, 1, n, stdout), "internal logic error"); A(n == fwrite(buf, 1, n, stdout), "internal logic error");
} }
if (ev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { if (ev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
A(0==pthread_mutex_lock(&ep->lock), ""); A(0 == pthread_mutex_lock(&ep->lock), "");
A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, client->skt, NULL), ""); A(0 == epoll_ctl(ep->ep, EPOLL_CTL_DEL, client->skt, NULL), "");
A(0==pthread_mutex_unlock(&ep->lock), ""); A(0 == pthread_mutex_unlock(&ep->lock), "");
close(client->skt); close(client->skt);
client->skt = -1; client->skt = -1;
client->on_event = NULL; client->on_event = NULL;
free(client); free(client);
} }
} }
...@@ -17,14 +17,11 @@ exe: ...@@ -17,14 +17,11 @@ exe:
gcc $(CFLAGS) ./stream.c -o $(ROOT)stream $(LFLAGS) gcc $(CFLAGS) ./stream.c -o $(ROOT)stream $(LFLAGS)
gcc $(CFLAGS) ./subscribe.c -o $(ROOT)subscribe $(LFLAGS) gcc $(CFLAGS) ./subscribe.c -o $(ROOT)subscribe $(LFLAGS)
gcc $(CFLAGS) ./apitest.c -o $(ROOT)apitest $(LFLAGS) gcc $(CFLAGS) ./apitest.c -o $(ROOT)apitest $(LFLAGS)
gcc $(CFLAGS) ./stmt.c -o $(ROOT)stmt $(LFLAGS)
clean: clean:
rm $(ROOT)asyncdemo rm $(ROOT)asyncdemo
rm $(ROOT)demo rm $(ROOT)demo
rm $(ROOT)prepare rm $(ROOT)prepare
rm $(ROOT)batchprepare
rm $(ROOT)stream rm $(ROOT)stream
rm $(ROOT)subscribe rm $(ROOT)subscribe
rm $(ROOT)apitest rm $(ROOT)apitest
rm $(ROOT)stmt
// TAOS standard API example. The same syntax as MySQL, but only a subet // TAOS standard API example. The same syntax as MySQL, but only a subet
// to compile: gcc -o prepare prepare.c -ltaos // to compile: gcc -o prepare prepare.c -ltaos
#include <stdio.h> #include <stdio.h>
...@@ -6,14 +6,12 @@ ...@@ -6,14 +6,12 @@
#include <string.h> #include <string.h>
#include "taos.h" #include "taos.h"
void taosMsleep(int mseconds); void taosMsleep(int mseconds);
int main(int argc, char *argv[]) int main(int argc, char *argv[]) {
{ TAOS * taos;
TAOS *taos; TAOS_RES * result;
TAOS_RES *result; int code;
int code;
TAOS_STMT *stmt; TAOS_STMT *stmt;
// connect to server // connect to server
...@@ -26,9 +24,9 @@ int main(int argc, char *argv[]) ...@@ -26,9 +24,9 @@ int main(int argc, char *argv[])
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
exit(1); exit(1);
} }
result = taos_query(taos, "drop database demo"); result = taos_query(taos, "drop database demo");
taos_free_result(result); taos_free_result(result);
result = taos_query(taos, "create database demo"); result = taos_query(taos, "create database demo");
...@@ -44,7 +42,9 @@ int main(int argc, char *argv[]) ...@@ -44,7 +42,9 @@ int main(int argc, char *argv[])
taos_free_result(result); taos_free_result(result);
// create table // create table
const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10))"; const char *sql =
"create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin "
"binary(40), blob nchar(10))";
result = taos_query(taos, sql); result = taos_query(taos, sql);
code = taos_errno(result); code = taos_errno(result);
if (code != 0) { if (code != 0) {
...@@ -59,16 +59,16 @@ int main(int argc, char *argv[]) ...@@ -59,16 +59,16 @@ int main(int argc, char *argv[])
// insert 10 records // insert 10 records
struct { struct {
int64_t ts; int64_t ts;
int8_t b; int8_t b;
int8_t v1; int8_t v1;
int16_t v2; int16_t v2;
int32_t v4; int32_t v4;
int64_t v8; int64_t v8;
float f4; float f4;
double f8; double f8;
char bin[40]; char bin[40];
char blob[80]; char blob[80];
} v = {0}; } v = {0};
stmt = taos_stmt_init(taos); stmt = taos_stmt_init(taos);
...@@ -138,7 +138,7 @@ int main(int argc, char *argv[]) ...@@ -138,7 +138,7 @@ int main(int argc, char *argv[])
sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?)"; sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?)";
code = taos_stmt_prepare(stmt, sql, 0); code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){ if (code != 0) {
printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); printf("failed to execute taos_stmt_prepare. code:0x%x\n", code);
} }
v.ts = 1591060628000; v.ts = 1591060628000;
...@@ -203,4 +203,3 @@ int main(int argc, char *argv[]) ...@@ -203,4 +203,3 @@ int main(int argc, char *argv[])
return 0; return 0;
} }
#include "os.h"
#include "taos.h" #include "taos.h"
#include "taoserror.h" #include "taoserror.h"
#include "os.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -12,15 +12,12 @@ int numSuperTables = 8; ...@@ -12,15 +12,12 @@ int numSuperTables = 8;
int numChildTables = 4; int numChildTables = 4;
int numRowsPerChildTable = 2048; int numRowsPerChildTable = 2048;
void shuffle(char**lines, size_t n) void shuffle(char** lines, size_t n) {
{ if (n > 1) {
if (n > 1)
{
size_t i; size_t i;
for (i = 0; i < n - 1; i++) for (i = 0; i < n - 1; i++) {
{
size_t j = i + rand() / (RAND_MAX / (n - i) + 1); size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
char* t = lines[j]; char* t = lines[j];
lines[j] = lines[i]; lines[j] = lines[i];
lines[i] = t; lines[i] = t;
} }
...@@ -34,7 +31,7 @@ static int64_t getTimeInUs() { ...@@ -34,7 +31,7 @@ static int64_t getTimeInUs() {
} }
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
TAOS_RES *result; TAOS_RES* result;
const char* host = "127.0.0.1"; const char* host = "127.0.0.1";
const char* user = "root"; const char* user = "root";
const char* passwd = "taosdata"; const char* passwd = "taosdata";
...@@ -59,12 +56,16 @@ int main(int argc, char* argv[]) { ...@@ -59,12 +56,16 @@ int main(int argc, char* argv[]) {
(void)taos_select_db(taos, "db"); (void)taos_select_db(taos, "db");
time_t ct = time(0); time_t ct = time(0);
int64_t ts = ct * 1000; int64_t ts = ct * 1000;
char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms"; char* lineFormat =
"sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11="
"\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*)); char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
int l = 0; int l = 0;
for (int i = 0; i < numSuperTables; ++i) { for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) { for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) { for (int k = 0; k < numRowsPerChildTable; ++k) {
...@@ -78,121 +79,142 @@ int main(int argc, char* argv[]) { ...@@ -78,121 +79,142 @@ int main(int argc, char* argv[]) {
shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable); shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable);
printf("%s\n", "begin taos_insert_lines"); printf("%s\n", "begin taos_insert_lines");
int64_t begin = getTimeInUs(); int64_t begin = getTimeInUs();
int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable); int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable);
int64_t end = getTimeInUs(); int64_t end = getTimeInUs();
printf("code: %d, %s. time used: %"PRId64"\n", code, tstrerror(code), end-begin); printf("code: %d, %s. time used: %" PRId64 "\n", code, tstrerror(code), end - begin);
char* lines_000_0[] = { char* lines_000_0[] = {
"sta1,id=sta1_1,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us" "sta1,id=sta1_1,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7="
}; "2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12="
"L\"ncharTagValue\" "
code = taos_insert_lines(taos, lines_000_0 , sizeof(lines_000_0)/sizeof(char*)); "c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" "
"1626006833639000us"};
code = taos_insert_lines(taos, lines_000_0, sizeof(lines_000_0) / sizeof(char*));
if (0 == code) { if (0 == code) {
printf("taos_insert_lines() lines_000_0 should return error\n"); printf("taos_insert_lines() lines_000_0 should return error\n");
return -1; return -1;
} }
char* lines_000_1[] = { char* lines_000_1[] = {
"sta2,id=\"sta2_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001" "sta2,id=\"sta2_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=255u8,t6=32770u16,"
}; "t7=2147483699u32,t8=9223372036854775899u64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12="
"L\"ncharTagValue\" "
code = taos_insert_lines(taos, lines_000_1 , sizeof(lines_000_1)/sizeof(char*)); "c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" "
"1626006833639001"};
code = taos_insert_lines(taos, lines_000_1, sizeof(lines_000_1) / sizeof(char*));
if (0 == code) { if (0 == code) {
printf("taos_insert_lines() lines_000_1 should return error\n"); printf("taos_insert_lines() lines_000_1 should return error\n");
return -1; return -1;
} }
char* lines_000_2[] = { char* lines_000_2[] = {
"sta3,id=\"sta3_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0" "sta3,id=\"sta3_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10="
}; "22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=255u8,c6=32770u16,c7=2147483699u32,"
"c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0"};
code = taos_insert_lines(taos, lines_000_2 , sizeof(lines_000_2)/sizeof(char*)); code = taos_insert_lines(taos, lines_000_2, sizeof(lines_000_2) / sizeof(char*));
if (0 != code) { if (0 != code) {
printf("taos_insert_lines() lines_000_2 return code:%d (%s)\n", code, (char*)tstrerror(code)); printf("taos_insert_lines() lines_000_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1; return -1;
} }
char* lines_001_0[] = { char* lines_001_0[] = {
"sta4,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us", "sta4,t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639000us",
}; };
code = taos_insert_lines(taos, lines_001_0 , sizeof(lines_001_0)/sizeof(char*)); code = taos_insert_lines(taos, lines_001_0, sizeof(lines_001_0) / sizeof(char*));
if (0 != code) { if (0 != code) {
printf("taos_insert_lines() lines_001_0 return code:%d (%s)\n", code, (char*)tstrerror(code)); printf("taos_insert_lines() lines_001_0 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1; return -1;
} }
char* lines_001_1[] = { char* lines_001_1[] = {
"sta5,id=\"sta5_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639001" "sta5,id=\"sta5_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10="
}; "22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639001"};
code = taos_insert_lines(taos, lines_001_1 , sizeof(lines_001_1)/sizeof(char*)); code = taos_insert_lines(taos, lines_001_1, sizeof(lines_001_1) / sizeof(char*));
if (0 != code) { if (0 != code) {
printf("taos_insert_lines() lines_001_1 return code:%d (%s)\n", code, (char*)tstrerror(code)); printf("taos_insert_lines() lines_001_1 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1; return -1;
} }
char* lines_001_2[] = { char* lines_001_2[] = {
"sta6,id=\"sta6_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" 0" "sta6,id=\"sta6_1\",t0=true,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t9=11.12345f32,t10="
}; "22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64,c11="
"\"binaryValue\",c12=L\"ncharValue\" 0"};
code = taos_insert_lines(taos, lines_001_2 , sizeof(lines_001_2)/sizeof(char*)); code = taos_insert_lines(taos, lines_001_2, sizeof(lines_001_2) / sizeof(char*));
if (0 != code) { if (0 != code) {
printf("taos_insert_lines() lines_001_2 return code:%d (%s)\n", code, (char*)tstrerror(code)); printf("taos_insert_lines() lines_001_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1; return -1;
} }
char* lines_002[] = { char* lines_002[] = {
"stb,id=\"stb_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639000000ns", "stb,id=\"stb_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833639019us", "t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006833640ms", "c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11=\"binaryValue\",c12=L\"ncharValue\" 1626006834s" "\"binaryValue\",c12=L\"ncharValue\" 1626006833639000000ns",
}; "stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
code = taos_insert_lines(taos, lines_002 , sizeof(lines_002)/sizeof(char*)); "c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833639019us",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006833640ms",
"stc,id=\"stc_1\",t20=t,t21=T,t22=true,t23=True,t24=TRUE,t25=f,t26=F,t27=false,t28=False,t29=FALSE,t10=33.12345,"
"t11=\"binaryTagValue\",t12=L\"ncharTagValue\" "
"c20=t,c21=T,c22=true,c23=True,c24=TRUE,c25=f,c26=F,c27=false,c28=False,c29=FALSE,c10=33.12345,c11="
"\"binaryValue\",c12=L\"ncharValue\" 1626006834s"};
code = taos_insert_lines(taos, lines_002, sizeof(lines_002) / sizeof(char*));
if (0 != code) { if (0 != code) {
printf("taos_insert_lines() lines_002 return code:%d (%s)\n", code, (char*)tstrerror(code)); printf("taos_insert_lines() lines_002 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1; return -1;
} }
//Duplicate key check; // Duplicate key check;
char* lines_003_1[] = { char* lines_003_1[] = {"std,id=\"std_3_1\",t1=4i64,Id=\"std\",t2=true c1=true 1626006834s"};
"std,id=\"std_3_1\",t1=4i64,Id=\"std\",t2=true c1=true 1626006834s"
};
code = taos_insert_lines(taos, lines_003_1 , sizeof(lines_003_1)/sizeof(char*)); code = taos_insert_lines(taos, lines_003_1, sizeof(lines_003_1) / sizeof(char*));
if (0 == code) { if (0 == code) {
printf("taos_insert_lines() lines_003_1 return code:%d (%s)\n", code, (char*)tstrerror(code)); printf("taos_insert_lines() lines_003_1 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1; return -1;
} }
char* lines_003_2[] = { char* lines_003_2[] = {"std,id=\"std_3_2\",tag1=4i64,Tag2=true,tAg3=2,TaG2=\"dup!\" c1=true 1626006834s"};
"std,id=\"std_3_2\",tag1=4i64,Tag2=true,tAg3=2,TaG2=\"dup!\" c1=true 1626006834s"
};
code = taos_insert_lines(taos, lines_003_2 , sizeof(lines_003_2)/sizeof(char*)); code = taos_insert_lines(taos, lines_003_2, sizeof(lines_003_2) / sizeof(char*));
if (0 == code) { if (0 == code) {
printf("taos_insert_lines() lines_003_2 return code:%d (%s)\n", code, (char*)tstrerror(code)); printf("taos_insert_lines() lines_003_2 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1; return -1;
} }
char* lines_003_3[] = { char* lines_003_3[] = {"std,id=\"std_3_3\",tag1=4i64 field1=true,Field2=2,FIElD1=\"dup!\",fIeLd4=true 1626006834s"};
"std,id=\"std_3_3\",tag1=4i64 field1=true,Field2=2,FIElD1=\"dup!\",fIeLd4=true 1626006834s"
};
code = taos_insert_lines(taos, lines_003_3 , sizeof(lines_003_3)/sizeof(char*)); code = taos_insert_lines(taos, lines_003_3, sizeof(lines_003_3) / sizeof(char*));
if (0 == code) { if (0 == code) {
printf("taos_insert_lines() lines_003_3 return code:%d (%s)\n", code, (char*)tstrerror(code)); printf("taos_insert_lines() lines_003_3 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1; return -1;
} }
char* lines_003_4[] = { char* lines_003_4[] = {
"std,id=\"std_3_4\",tag1=4i64,dupkey=4i16,tag2=T field1=true,dUpkEy=1e3f32,field2=\"1234\" 1626006834s" "std,id=\"std_3_4\",tag1=4i64,dupkey=4i16,tag2=T field1=true,dUpkEy=1e3f32,field2=\"1234\" 1626006834s"};
};
code = taos_insert_lines(taos, lines_003_4 , sizeof(lines_003_4)/sizeof(char*)); code = taos_insert_lines(taos, lines_003_4, sizeof(lines_003_4) / sizeof(char*));
if (0 == code) { if (0 == code) {
printf("taos_insert_lines() lines_003_4 return code:%d (%s)\n", code, (char*)tstrerror(code)); printf("taos_insert_lines() lines_003_4 return code:%d (%s)\n", code, (char*)tstrerror(code));
return -1; return -1;
......
...@@ -13,24 +13,23 @@ ...@@ -13,24 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <pthread.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <taos.h> // include TDengine header file #include <taos.h> // include TDengine header file
#include <unistd.h>
typedef struct { typedef struct {
char server_ip[64]; char server_ip[64];
char db_name[64]; char db_name[64];
char tbl_name[64]; char tbl_name[64];
} param; } param;
int g_thread_exit_flag = 0; int g_thread_exit_flag = 0;
void* insert_rows(void *sarg); void *insert_rows(void *sarg);
void streamCallBack(void *param, TAOS_RES *res, TAOS_ROW row) void streamCallBack(void *param, TAOS_RES *res, TAOS_ROW row) {
{
// in this simple demo, it just print out the result // in this simple demo, it just print out the result
char temp[128]; char temp[128];
...@@ -42,85 +41,81 @@ void streamCallBack(void *param, TAOS_RES *res, TAOS_ROW row) ...@@ -42,85 +41,81 @@ void streamCallBack(void *param, TAOS_RES *res, TAOS_ROW row)
printf("\n%s\n", temp); printf("\n%s\n", temp);
} }
int main(int argc, char *argv[]) int main(int argc, char *argv[]) {
{ TAOS *taos;
TAOS *taos; char db_name[64];
char db_name[64]; char tbl_name[64];
char tbl_name[64]; char sql[1024] = {0};
char sql[1024] = { 0 };
if (argc != 4) { if (argc != 4) {
printf("usage: %s server-ip dbname tblname\n", argv[0]); printf("usage: %s server-ip dbname tblname\n", argv[0]);
exit(0); exit(0);
} }
strcpy(db_name, argv[2]); strcpy(db_name, argv[2]);
strcpy(tbl_name, argv[3]); strcpy(tbl_name, argv[3]);
// create pthread to insert into row per second for stream calc // create pthread to insert into row per second for stream calc
param *t_param = (param *)malloc(sizeof(param)); param *t_param = (param *)malloc(sizeof(param));
if (NULL == t_param) if (NULL == t_param) {
{
printf("failed to malloc\n"); printf("failed to malloc\n");
exit(1); exit(1);
} }
memset(t_param, 0, sizeof(param)); memset(t_param, 0, sizeof(param));
strcpy(t_param->server_ip, argv[1]); strcpy(t_param->server_ip, argv[1]);
strcpy(t_param->db_name, db_name); strcpy(t_param->db_name, db_name);
strcpy(t_param->tbl_name, tbl_name); strcpy(t_param->tbl_name, tbl_name);
pthread_t pid; pthread_t pid;
pthread_create(&pid, NULL, (void * (*)(void *))insert_rows, t_param); pthread_create(&pid, NULL, (void *(*)(void *))insert_rows, t_param);
sleep(3); // waiting for database is created. sleep(3); // waiting for database is created.
// open connection to database // open connection to database
taos = taos_connect(argv[1], "root", "taosdata", db_name, 0); taos = taos_connect(argv[1], "root", "taosdata", db_name, 0);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connet to server:%s\n", argv[1]); printf("failed to connet to server:%s\n", argv[1]);
free(t_param); free(t_param);
exit(1); exit(1);
} }
// starting stream calc, // starting stream calc,
printf("please input stream SQL:[e.g., select count(*) from tblname interval(5s) sliding(2s);]\n"); printf("please input stream SQL:[e.g., select count(*) from tblname interval(5s) sliding(2s);]\n");
fgets(sql, sizeof(sql), stdin); fgets(sql, sizeof(sql), stdin);
if (sql[0] == 0) { if (sql[0] == 0) {
printf("input NULL stream SQL, so exit!\n"); printf("input NULL stream SQL, so exit!\n");
free(t_param); free(t_param);
exit(1); exit(1);
} }
// param is set to NULL in this demo, it shall be set to the pointer to app context // param is set to NULL in this demo, it shall be set to the pointer to app context
TAOS_STREAM *pStream = taos_open_stream(taos, sql, streamCallBack, 0, NULL, NULL); TAOS_STREAM *pStream = taos_open_stream(taos, sql, streamCallBack, 0, NULL, NULL);
if (NULL == pStream) { if (NULL == pStream) {
printf("failed to create stream\n"); printf("failed to create stream\n");
free(t_param); free(t_param);
exit(1); exit(1);
} }
printf("presss any key to exit\n"); printf("presss any key to exit\n");
getchar(); getchar();
taos_close_stream(pStream); taos_close_stream(pStream);
g_thread_exit_flag = 1; g_thread_exit_flag = 1;
pthread_join(pid, NULL); pthread_join(pid, NULL);
taos_close(taos); taos_close(taos);
free(t_param); free(t_param);
return 0; return 0;
} }
void *insert_rows(void *sarg) {
TAOS * taos;
char command[1024] = {0};
param *winfo = (param *)sarg;
void* insert_rows(void *sarg) if (NULL == winfo) {
{ printf("para is null!\n");
TAOS *taos;
char command[1024] = { 0 };
param *winfo = (param * )sarg;
if (NULL == winfo){
printf("para is null!\n");
exit(1); exit(1);
} }
...@@ -129,7 +124,7 @@ void* insert_rows(void *sarg) ...@@ -129,7 +124,7 @@ void* insert_rows(void *sarg)
printf("failed to connet to server:%s\n", winfo->server_ip); printf("failed to connet to server:%s\n", winfo->server_ip);
exit(1); exit(1);
} }
// drop database // drop database
sprintf(command, "drop database %s;", winfo->db_name); sprintf(command, "drop database %s;", winfo->db_name);
if (taos_query(taos, command) != 0) { if (taos_query(taos, command) != 0) {
...@@ -160,19 +155,18 @@ void* insert_rows(void *sarg) ...@@ -160,19 +155,18 @@ void* insert_rows(void *sarg)
// insert data // insert data
int64_t begin = (int64_t)time(NULL); int64_t begin = (int64_t)time(NULL);
int index = 0; int index = 0;
while (1) { while (1) {
if (g_thread_exit_flag) break; if (g_thread_exit_flag) break;
index++; index++;
sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, (begin + index) * 1000, index); sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, (begin + index) * 1000, index);
if (taos_query(taos, command)) { if (taos_query(taos, command)) {
printf("failed to insert row [%s], reason:%s\n", command, taos_errstr(taos)); printf("failed to insert row [%s], reason:%s\n", command, taos_errstr(taos));
} }
sleep(1); sleep(1);
} }
taos_close(taos); taos_close(taos);
return 0; return 0;
} }
...@@ -14,10 +14,10 @@ void print_result(TAOS_RES* res, int blockFetch) { ...@@ -14,10 +14,10 @@ void print_result(TAOS_RES* res, int blockFetch) {
int num_fields = taos_num_fields(res); int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res); TAOS_FIELD* fields = taos_fetch_fields(res);
int nRows = 0; int nRows = 0;
if (blockFetch) { if (blockFetch) {
nRows = taos_fetch_block(res, &row); nRows = taos_fetch_block(res, &row);
//for (int i = 0; i < nRows; i++) { // for (int i = 0; i < nRows; i++) {
// taos_print_row(buf, row + i, fields, num_fields); // taos_print_row(buf, row + i, fields, num_fields);
// puts(buf); // puts(buf);
//} //}
...@@ -34,15 +34,11 @@ void print_result(TAOS_RES* res, int blockFetch) { ...@@ -34,15 +34,11 @@ void print_result(TAOS_RES* res, int blockFetch) {
printf("%d rows consumed.\n", nRows); printf("%d rows consumed.\n", nRows);
} }
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES* res, void* param, int code) { print_result(res, *(int*)param); }
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
print_result(res, *(int*)param);
}
void check_row_count(int line, TAOS_RES* res, int expected) { void check_row_count(int line, TAOS_RES* res, int expected) {
int actual = 0; int actual = 0;
TAOS_ROW row; TAOS_ROW row;
while ((row = taos_fetch_row(res))) { while ((row = taos_fetch_row(res))) {
actual++; actual++;
} }
...@@ -53,16 +49,14 @@ void check_row_count(int line, TAOS_RES* res, int expected) { ...@@ -53,16 +49,14 @@ void check_row_count(int line, TAOS_RES* res, int expected) {
} }
} }
void do_query(TAOS* taos, const char* sql) { void do_query(TAOS* taos, const char* sql) {
TAOS_RES* res = taos_query(taos, sql); TAOS_RES* res = taos_query(taos, sql);
taos_free_result(res); taos_free_result(res);
} }
void run_test(TAOS* taos) { void run_test(TAOS* taos) {
do_query(taos, "drop database if exists test;"); do_query(taos, "drop database if exists test;");
usleep(100000); usleep(100000);
do_query(taos, "create database test;"); do_query(taos, "create database test;");
usleep(100000); usleep(100000);
...@@ -161,14 +155,13 @@ void run_test(TAOS* taos) { ...@@ -161,14 +155,13 @@ void run_test(TAOS* taos) {
taos_unsubscribe(tsub, 0); taos_unsubscribe(tsub, 0);
} }
int main(int argc, char* argv[]) {
int main(int argc, char *argv[]) {
const char* host = "127.0.0.1"; const char* host = "127.0.0.1";
const char* user = "root"; const char* user = "root";
const char* passwd = "taosdata"; const char* passwd = "taosdata";
const char* sql = "select * from meters;"; const char* sql = "select * from meters;";
const char* topic = "test-multiple"; const char* topic = "test-multiple";
int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0; int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
for (int i = 1; i < argc; i++) { for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "-h=", 3) == 0) { if (strncmp(argv[i], "-h=", 3) == 0) {
...@@ -240,20 +233,21 @@ int main(int argc, char *argv[]) { ...@@ -240,20 +233,21 @@ int main(int argc, char *argv[]) {
if (tsub == NULL) { if (tsub == NULL) {
printf("failed to create subscription.\n"); printf("failed to create subscription.\n");
exit(0); exit(0);
} }
if (async) { if (async) {
getchar(); getchar();
} else while(1) { } else
TAOS_RES* res = taos_consume(tsub); while (1) {
if (res == NULL) { TAOS_RES* res = taos_consume(tsub);
printf("failed to consume data."); if (res == NULL) {
break; printf("failed to consume data.");
} else { break;
print_result(res, blockFetch); } else {
getchar(); print_result(res, blockFetch);
getchar();
}
} }
}
printf("total rows consumed: %d\n", nTotalRows); printf("total rows consumed: %d\n", nTotalRows);
taos_unsubscribe(tsub, keep); taos_unsubscribe(tsub, keep);
......
...@@ -15,8 +15,10 @@ exe: ...@@ -15,8 +15,10 @@ exe:
gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS) gcc $(CFLAGS) ./batchprepare.c -o $(ROOT)batchprepare $(LFLAGS)
gcc $(CFLAGS) ./stmtBatchTest.c -o $(ROOT)stmtBatchTest $(LFLAGS) gcc $(CFLAGS) ./stmtBatchTest.c -o $(ROOT)stmtBatchTest $(LFLAGS)
gcc $(CFLAGS) ./stmtTest.c -o $(ROOT)stmtTest $(LFLAGS) gcc $(CFLAGS) ./stmtTest.c -o $(ROOT)stmtTest $(LFLAGS)
gcc $(CFLAGS) ./stmt.c -o $(ROOT)stmt $(LFLAGS)
clean: clean:
rm $(ROOT)batchprepare rm $(ROOT)batchprepare
rm $(ROOT)stmtBatchTest rm $(ROOT)stmtBatchTest
rm $(ROOT)stmtTest rm $(ROOT)stmtTest
rm $(ROOT)stmt
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册