stream.c 4.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

Z
zhaoyanggh 已提交
16
#include <pthread.h>
H
hzcheng 已提交
17 18 19
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
20
#include <taos.h>  // include TDengine header file
Z
zhaoyanggh 已提交
21
#include <unistd.h>
H
hzcheng 已提交
22 23

typedef struct {
Z
zhaoyanggh 已提交
24 25 26
  char server_ip[64];
  char db_name[64];
  char tbl_name[64];
H
hzcheng 已提交
27 28
} param;

Z
zhaoyanggh 已提交
29 30
int   g_thread_exit_flag = 0;
void *insert_rows(void *sarg);
H
hzcheng 已提交
31

Z
zhaoyanggh 已提交
32
void streamCallBack(void *param, TAOS_RES *res, TAOS_ROW row) {
H
hzcheng 已提交
33 34 35 36 37 38 39 40 41 42 43
  // in this simple demo, it just print out the result
  char temp[128];

  TAOS_FIELD *fields = taos_fetch_fields(res);
  int         numFields = taos_num_fields(res);

  taos_print_row(temp, row, fields, numFields);

  printf("\n%s\n", temp);
}

Z
zhaoyanggh 已提交
44 45 46 47 48
int main(int argc, char *argv[]) {
  TAOS *taos;
  char  db_name[64];
  char  tbl_name[64];
  char  sql[1024] = {0};
H
hzcheng 已提交
49 50 51 52

  if (argc != 4) {
    printf("usage: %s server-ip dbname tblname\n", argv[0]);
    exit(0);
Z
zhaoyanggh 已提交
53
  }
H
hzcheng 已提交
54 55 56

  strcpy(db_name, argv[2]);
  strcpy(tbl_name, argv[3]);
Z
zhaoyanggh 已提交
57

H
hzcheng 已提交
58 59
  // create pthread to insert into row per second for stream calc
  param *t_param = (param *)malloc(sizeof(param));
Z
zhaoyanggh 已提交
60
  if (NULL == t_param) {
H
hzcheng 已提交
61 62 63
    printf("failed to malloc\n");
    exit(1);
  }
Z
zhaoyanggh 已提交
64
  memset(t_param, 0, sizeof(param));
H
hzcheng 已提交
65 66 67 68 69
  strcpy(t_param->server_ip, argv[1]);
  strcpy(t_param->db_name, db_name);
  strcpy(t_param->tbl_name, tbl_name);

  pthread_t pid;
Z
zhaoyanggh 已提交
70
  pthread_create(&pid, NULL, (void *(*)(void *))insert_rows, t_param);
H
hzcheng 已提交
71

Z
zhaoyanggh 已提交
72
  sleep(3);  // waiting for database is created.
H
hzcheng 已提交
73 74 75 76
  // open connection to database
  taos = taos_connect(argv[1], "root", "taosdata", db_name, 0);
  if (taos == NULL) {
    printf("failed to connet to server:%s\n", argv[1]);
Z
zhaoyanggh 已提交
77
    free(t_param);
H
hzcheng 已提交
78 79 80
    exit(1);
  }

Z
zhaoyanggh 已提交
81
  // starting stream calc,
S
slguan 已提交
82
  printf("please input stream SQL:[e.g., select count(*) from tblname interval(5s) sliding(2s);]\n");
H
hzcheng 已提交
83 84
  fgets(sql, sizeof(sql), stdin);
  if (sql[0] == 0) {
Z
zhaoyanggh 已提交
85
    printf("input NULL stream SQL, so exit!\n");
S
slguan 已提交
86 87
    free(t_param);
    exit(1);
H
hzcheng 已提交
88 89
  }

Z
zhaoyanggh 已提交
90
  // param is set to NULL in this demo, it shall be set to the pointer to app context
H
hzcheng 已提交
91 92
  TAOS_STREAM *pStream = taos_open_stream(taos, sql, streamCallBack, 0, NULL, NULL);
  if (NULL == pStream) {
Z
zhaoyanggh 已提交
93
    printf("failed to create stream\n");
S
slguan 已提交
94 95
    free(t_param);
    exit(1);
H
hzcheng 已提交
96
  }
Z
zhaoyanggh 已提交
97

J
jiyun99 已提交
98
  printf("press any key to exit\n");
H
hzcheng 已提交
99 100 101
  getchar();

  taos_close_stream(pStream);
Z
zhaoyanggh 已提交
102 103

  g_thread_exit_flag = 1;
H
hzcheng 已提交
104 105 106
  pthread_join(pid, NULL);

  taos_close(taos);
Z
zhaoyanggh 已提交
107
  free(t_param);
H
hzcheng 已提交
108 109 110 111

  return 0;
}

Z
zhaoyanggh 已提交
112 113 114 115
void *insert_rows(void *sarg) {
  TAOS * taos;
  char   command[1024] = {0};
  param *winfo = (param *)sarg;
H
hzcheng 已提交
116

Z
zhaoyanggh 已提交
117 118
  if (NULL == winfo) {
    printf("para is null!\n");
H
hzcheng 已提交
119 120 121 122 123
    exit(1);
  }

  taos = taos_connect(winfo->server_ip, "root", "taosdata", NULL, 0);
  if (taos == NULL) {
S
slguan 已提交
124 125
    printf("failed to connet to server:%s\n", winfo->server_ip);
    exit(1);
H
hzcheng 已提交
126
  }
Z
zhaoyanggh 已提交
127

H
hzcheng 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  // drop database
  sprintf(command, "drop database %s;", winfo->db_name);
  if (taos_query(taos, command) != 0) {
    printf("failed to drop database, reason:%s\n", taos_errstr(taos));
    exit(1);
  }

  // create database
  sprintf(command, "create database %s;", winfo->db_name);
  if (taos_query(taos, command) != 0) {
    printf("failed to create database, reason:%s\n", taos_errstr(taos));
    exit(1);
  }

  // use database
  sprintf(command, "use %s;", winfo->db_name);
  if (taos_query(taos, command) != 0) {
    printf("failed to use database, reason:%s\n", taos_errstr(taos));
    exit(1);
  }

  // create table
  sprintf(command, "create table %s (ts timestamp, speed int);", winfo->tbl_name);
  if (taos_query(taos, command) != 0) {
    printf("failed to create table, reason:%s\n", taos_errstr(taos));
    exit(1);
  }

  // insert data
S
slguan 已提交
157
  int64_t begin = (int64_t)time(NULL);
Z
zhaoyanggh 已提交
158
  int     index = 0;
H
hzcheng 已提交
159
  while (1) {
S
slguan 已提交
160
    if (g_thread_exit_flag) break;
Z
zhaoyanggh 已提交
161

S
slguan 已提交
162
    index++;
S
slguan 已提交
163
    sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, (begin + index) * 1000, index);
H
hzcheng 已提交
164
    if (taos_query(taos, command)) {
S
slguan 已提交
165
      printf("failed to insert row [%s], reason:%s\n", command, taos_errstr(taos));
H
hzcheng 已提交
166
    }
S
slguan 已提交
167
    sleep(1);
Z
zhaoyanggh 已提交
168
  }
H
hzcheng 已提交
169 170 171 172

  taos_close(taos);
  return 0;
}