schemaless.c 5.4 KB
Newer Older
Z
zhaoyanggh 已提交
1
#include "os.h"
S
shenglian zhou 已提交
2 3 4 5 6 7 8 9 10
#include "taos.h"
#include "taoserror.h"

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>

11
int numThreads = 8;
S
shenglian zhou 已提交
12
int numSuperTables = 8;
13
int numChildTables = 4; // per thread, per super table
S
shenglian zhou 已提交
14
int numRowsPerChildTable = 2048;
S
shenglian zhou 已提交
15

Z
zhaoyanggh 已提交
16 17
void shuffle(char** lines, size_t n) {
  if (n > 1) {
S
shenglian zhou 已提交
18
    size_t i;
Z
zhaoyanggh 已提交
19
    for (i = 0; i < n - 1; i++) {
S
shenglian zhou 已提交
20
      size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
Z
zhaoyanggh 已提交
21
      char*  t = lines[j];
S
shenglian zhou 已提交
22 23 24 25 26 27
      lines[j] = lines[i];
      lines[i] = t;
    }
  }
}

28 29 30 31 32 33 34
void printThreadId(pthread_t id, char* buf)
{
  size_t i;
  for (i = sizeof(i); i; --i)
    sprintf(buf + strlen(buf), "%02x", *(((unsigned char*) &id) + i - 1));
}

S
shenglian zhou 已提交
35 36 37 38 39 40
static int64_t getTimeInUs() {
  struct timeval systemTime;
  gettimeofday(&systemTime, NULL);
  return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}

41 42 43 44
typedef struct  {
  TAOS* taos;
  char** lines;
  int numLines;
45
  int64_t costTime;
46 47 48 49 50 51 52 53 54 55
} SThreadInsertArgs;

static void* insertLines(void* args) {
  SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args;
  char tidBuf[32] = {0};
  printThreadId(pthread_self(), tidBuf);
  printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
  int64_t begin = getTimeInUs();
  int32_t code = taos_insert_lines(insertArgs->taos, insertArgs->lines, insertArgs->numLines);
  int64_t end = getTimeInUs();
56
  insertArgs->costTime = end-begin;
57 58 59 60
  printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
  return NULL;
}

S
shenglian zhou 已提交
61
int main(int argc, char* argv[]) {
Z
zhaoyanggh 已提交
62
  TAOS_RES*   result;
S
shenglian zhou 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";

  taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
  TAOS* taos = taos_connect(host, user, passwd, "", 0);
  if (taos == NULL) {
    printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
    exit(1);
  }

  char* info = taos_get_server_info(taos);
  printf("server info: %s\n", info);
  info = taos_get_client_info(taos);
  printf("client info: %s\n", info);
  result = taos_query(taos, "drop database if exists db;");
  taos_free_result(result);
  usleep(100000);
  result = taos_query(taos, "create database db precision 'ms';");
  taos_free_result(result);
  usleep(100000);

  (void)taos_select_db(taos, "db");

Z
zhaoyanggh 已提交
87
  time_t  ct = time(0);
S
shenglian zhou 已提交
88
  int64_t ts = ct * 1000;
89
  char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
S
shenglian zhou 已提交
90

91 92 93 94 95
  {
    char** linesStb = calloc(numSuperTables, sizeof(char*));
    for (int i = 0; i < numSuperTables; i++) {
      char* lineStb = calloc(512, 1);
      snprintf(lineStb, 512, lineFormat, i,
96 97
               numThreads * numSuperTables * numChildTables,
               ts + numThreads * numSuperTables * numChildTables * numRowsPerChildTable);
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
      linesStb[i] = lineStb;
    }
    SThreadInsertArgs args = {0};
    args.taos = taos;
    args.lines = linesStb;
    args.numLines = numSuperTables;
    insertLines(&args);
    for (int i = 0; i < numSuperTables; ++i) {
      free(linesStb[i]);
    }
    free(linesStb);
  }

  printf("generate lines...\n");
  char*** linesThread = calloc(numThreads, sizeof(char**));
  for (int i = 0; i < numThreads; ++i) {
114
    char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*));
115 116 117 118 119 120 121
    linesThread[i] = lines;
  }

  for (int t = 0; t < numThreads; ++t) {
    int l = 0;
    char** lines = linesThread[t];
    for (int i = 0; i < numSuperTables; ++i) {
122
      for (int j = 0; j < numChildTables; ++j) {
123 124
        for (int k = 0; k < numRowsPerChildTable; ++k) {
          int stIdx = i;
125
          int ctIdx = t*numSuperTables*numChildTables + j;
126 127 128 129 130
          char* line = calloc(512, 1);
          snprintf(line, 512, lineFormat, stIdx, ctIdx, ts + 10 * l);
          lines[l] = line;
          ++l;
        }
S
shenglian zhou 已提交
131 132 133 134
      }
    }
  }

135 136
  printf("shuffle lines...\n");
  for (int t = 0; t < numThreads; ++t) {
137
    shuffle(linesThread[t], numSuperTables * numChildTables * numRowsPerChildTable);
138 139
  }

140 141
  printf("begin multi-thread insertion...\n");
  int64_t begin = taosGetTimestampUs();
142 143 144 145 146
  pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
  SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
  for (int i=0; i < numThreads; ++i) {
    argsThread[i].lines = linesThread[i];
    argsThread[i].taos = taos;
147
    argsThread[i].numLines = numSuperTables * numChildTables * numRowsPerChildTable;
148 149 150 151 152 153
    pthread_create(tids+i, NULL, insertLines, argsThread+i);
  }

  for (int i = 0; i < numThreads; ++i) {
    pthread_join(tids[i], NULL);
  }
154 155 156 157 158 159 160 161 162 163 164 165
  int64_t end = taosGetTimestampUs();

  int totalLines = numThreads*numSuperTables*numChildTables*numRowsPerChildTable;
  printf("TOTAL LINES: %d\n", totalLines);
  printf("THREADS: %d\n", numThreads);
  int64_t sumTime = 0;
  for (int i=0; i<numThreads; ++i) {
    sumTime += argsThread[i].costTime;
  }
  printf("TIME: %d(ms)\n", (int)(end-begin)/1000);
  double throughput = (double)(totalLines)/(double)(end-begin) * 1000000;
  printf("THROUGHPUT:%d/s\n", (int)throughput);
166 167 168 169 170 171
  free(argsThread);
  free(tids);
  for (int i = 0; i < numThreads; ++i) {
    free(linesThread[i]);
  }
  free(linesThread);
172

173
  taos_close(taos);
S
shenglian zhou 已提交
174 175
  return 0;
}