schemaless.c 9.9 KB
Newer Older
Z
zhaoyanggh 已提交
1
#include "os.h"
S
shenglian zhou 已提交
2 3 4 5 6 7 8 9 10
#include "taos.h"
#include "taoserror.h"

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>

11
bool verbose = false;
S
shenglian zhou 已提交
12

S
shenglian zhou 已提交
13

14
void printThreadId(pthread_t id, char* buf)
S
shenglian zhou 已提交
15
{
16 17 18
  size_t i;
  for (i = sizeof(i); i; --i)
    sprintf(buf + strlen(buf), "%02x", *(((unsigned char*) &id) + i - 1));
S
shenglian zhou 已提交
19 20 21 22 23 24 25 26
}

static int64_t getTimeInUs() {
  struct timeval systemTime;
  gettimeofday(&systemTime, NULL);
  return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}

27
typedef struct {
28 29
  char** lines;
  int numLines;
30 31 32 33
} SThreadLinesBatch;

typedef struct  {
  TAOS* taos;
S
shenglian zhou 已提交
34
  int protocol;
35
  int numBatches;
36
  SThreadLinesBatch *batches;
37
  int64_t costTime;
38 39 40 41
} SThreadInsertArgs;

static void* insertLines(void* args) {
  SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args;
G
Ganlin Zhao 已提交
42
  char tidBuf[32] = {0};
43
  printThreadId(pthread_self(), tidBuf);
44 45
  for (int i = 0; i < insertArgs->numBatches; ++i) {
    SThreadLinesBatch* batch = insertArgs->batches + i;
46
    if (verbose) printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
47
    int64_t begin = getTimeInUs();
48
    //int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines);
S
shenglian zhou 已提交
49 50
    TAOS_RES * res = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, insertArgs->protocol, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
    int32_t code = taos_errno(res);
51 52
    int64_t end = getTimeInUs();
    insertArgs->costTime += end - begin;
53
    if (verbose) printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
54
  }
55 56 57
  return NULL;
}

S
shenglian zhou 已提交
58 59 60 61 62 63
int32_t getTelenetTemplate(char* lineTemplate, int templateLen) {
  char* sample = "sta%d %lld 44.3 t0=False t1=127i8 t2=32 t3=%di32 t4=9223372036854775807i64 t5=11.12345f32 t6=22.123456789f64 t7=\"hpxzrdiw\" t8=\"ncharTagValue\" t9=127i8";
  snprintf(lineTemplate, templateLen, "%s", sample);
  return 0;
}

64 65
int32_t getLineTemplate(char* lineTemplate, int templateLen, int numFields) {
  if (numFields <= 4) {
S
shenglian zhou 已提交
66
    char* sample = "sta%d,t3=%di32 c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64 %lld";
67 68 69 70 71
    snprintf(lineTemplate, templateLen, "%s", sample);
    return 0;
  }

  if (numFields <= 13) {
72
     char* sample = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lld";
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
     snprintf(lineTemplate, templateLen, "%s", sample);
     return 0;
  }

  char* lineFormatTable = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32 ";
  snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "%s", lineFormatTable);

  int offset[] = {numFields*2/5, numFields*4/5, numFields};

  for (int i = 0; i < offset[0]; ++i) {
    snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%di32,", i, i);
  }

  for (int i=offset[0]+1; i < offset[1]; ++i) {
    snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%d.43f64,", i, i);
  }

  for (int i = offset[1]+1; i < offset[2]; ++i) {
    snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=\"%d\",", i, i);
  }
S
shenglian zhou 已提交
93
  char* lineFormatTs = " %lld";
94 95 96 97 98
  snprintf(lineTemplate+strlen(lineTemplate)-1, templateLen-strlen(lineTemplate)+1, "%s", lineFormatTs);

  return 0;
}

S
shenglian zhou 已提交
99 100 101 102 103 104 105 106 107
int32_t generateLine(char* line, int lineLen, char* lineTemplate, int protocol, int superTable, int childTable, int64_t ts) {
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
    snprintf(line, lineLen, lineTemplate, superTable, childTable, ts);               
  } else if (protocol == TSDB_SML_TELNET_PROTOCOL) {
    snprintf(line, lineLen, lineTemplate, superTable, ts, childTable);
  }
  return TSDB_CODE_SUCCESS;
}

S
shenglian zhou 已提交
108
int main(int argc, char* argv[]) {
109
  int numThreads = 8;
110
  int maxBatchesPerThread = 1024;	
111 112 113 114 115 116 117 118

  int numSuperTables = 1;
  int numChildTables = 256;
  int numRowsPerChildTable = 8192;
  int numFields = 13;

  int maxLinesPerBatch = 16384;

S
shenglian zhou 已提交
119 120
  int protocol = TSDB_SML_TELNET_PROTOCOL;

121
  int opt;
S
shenglian zhou 已提交
122
  while ((opt = getopt(argc, argv, "s:c:r:f:t:b:p:hv")) != -1) {
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
    switch (opt) {
      case 's':
        numSuperTables = atoi(optarg);
        break;
      case 'c':
        numChildTables = atoi(optarg);
        break;
      case 'r':
        numRowsPerChildTable = atoi(optarg);
        break;
      case 'f':
        numFields = atoi(optarg);
        break;
      case 't':
        numThreads = atoi(optarg);
        break;
139
      case 'b':
140 141
        maxLinesPerBatch = atoi(optarg);
        break;
142 143
      case 'v':
        verbose = true;
G
Ganlin Zhao 已提交
144
        break;
S
shenglian zhou 已提交
145 146 147 148 149 150 151 152 153
      case 'p':
        if (optarg[0] == 't') {
          protocol = TSDB_SML_TELNET_PROTOCOL;
        } else if (optarg[0] == 'l') {
          protocol = TSDB_SML_LINE_PROTOCOL;
        } else if (optarg[0] == 'j') {
          protocol = TSDB_SML_JSON_PROTOCOL;
        }
        break;
154
      case 'h':
S
shenglian zhou 已提交
155
        fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -p [t|l|j] -v\n",
156 157 158
                argv[0]);
        exit(0);
      default: /* '?' */
S
shenglian zhou 已提交
159
        fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -p [t|l|j] -v\n",
160 161 162 163 164
                argv[0]);
        exit(-1);
    }
  }

Z
zhaoyanggh 已提交
165
  TAOS_RES*   result;
166 167
  //const char* host = "127.0.0.1";
  const char* host = NULL;
S
shenglian zhou 已提交
168 169 170 171 172 173 174 175 176 177
  const char* user = "root";
  const char* passwd = "taosdata";

  taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
  TAOS* taos = taos_connect(host, user, passwd, "", 0);
  if (taos == NULL) {
    printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
    exit(1);
  }

178
  maxBatchesPerThread = (numSuperTables*numChildTables*numRowsPerChildTable)/(numThreads * maxLinesPerBatch) + 1;
179

S
shenglian zhou 已提交
180 181 182 183 184 185 186
  char* info = taos_get_server_info(taos);
  printf("server info: %s\n", info);
  info = taos_get_client_info(taos);
  printf("client info: %s\n", info);
  result = taos_query(taos, "drop database if exists db;");
  taos_free_result(result);
  usleep(100000);
187
  result = taos_query(taos, "create database db precision 'us';");
S
shenglian zhou 已提交
188 189 190 191 192
  taos_free_result(result);
  usleep(100000);

  (void)taos_select_db(taos, "db");

Z
zhaoyanggh 已提交
193
  time_t  ct = time(0);
194
  int64_t ts = ct * 1000 ;
195 196

  char* lineTemplate = calloc(65536, sizeof(char));
S
shenglian zhou 已提交
197 198 199 200 201
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
    getLineTemplate(lineTemplate, 65535, numFields);
  } else if (protocol == TSDB_SML_TELNET_PROTOCOL ) {
    getTelenetTemplate(lineTemplate, 65535);
  }
S
shenglian zhou 已提交
202

203
  printf("setup supertables...");
204 205 206
  {
    char** linesStb = calloc(numSuperTables, sizeof(char*));
    for (int i = 0; i < numSuperTables; i++) {
207
      char* lineStb = calloc(strlen(lineTemplate)+128, 1);
S
shenglian zhou 已提交
208
      generateLine(lineStb, strlen(lineTemplate)+128, lineTemplate, protocol, i,
209 210
               numSuperTables * numChildTables,
               ts + numSuperTables * numChildTables * numRowsPerChildTable);
211 212 213
      linesStb[i] = lineStb;
    }
    SThreadInsertArgs args = {0};
S
shenglian zhou 已提交
214
    args.protocol = protocol;
215
    args.batches = calloc(maxBatchesPerThread, sizeof(maxBatchesPerThread));
216
    args.taos = taos;
217 218
    args.batches[0].lines = linesStb;
    args.batches[0].numLines = numSuperTables;
219
    insertLines(&args);
220
    free(args.batches);
221 222 223 224 225 226 227
    for (int i = 0; i < numSuperTables; ++i) {
      free(linesStb[i]);
    }
    free(linesStb);
  }

  printf("generate lines...\n");
228 229
  pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
  SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
230
  for (int i = 0; i < numThreads; ++i) {
231
    argsThread[i].batches = calloc(maxBatchesPerThread, sizeof(SThreadLinesBatch));	  
232 233
    argsThread[i].taos = taos;
    argsThread[i].numBatches = 0;
S
shenglian zhou 已提交
234
    argsThread[i].protocol = protocol;
235 236
  }

237 238 239 240
  int64_t totalLines = numSuperTables * numChildTables * numRowsPerChildTable;
  int totalBatches = (int) ((totalLines) / maxLinesPerBatch);
  if (totalLines % maxLinesPerBatch != 0) {
    totalBatches += 1;
S
shenglian zhou 已提交
241 242
  }

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
  char*** allBatches = calloc(totalBatches, sizeof(char**));
  for (int i = 0; i < totalBatches; ++i) {
    allBatches[i] = calloc(maxLinesPerBatch, sizeof(char*));
    int threadNo = i % numThreads;
    int batchNo = i / numThreads;
    argsThread[threadNo].batches[batchNo].lines = allBatches[i];
    argsThread[threadNo].numBatches = batchNo + 1;
  }

  int l = 0;
  for (int i = 0; i < numSuperTables; ++i) {
    for (int j = 0; j < numChildTables; ++j) {
      for (int k = 0; k < numRowsPerChildTable; ++k) {
        int stIdx = i;
        int ctIdx = numSuperTables*numChildTables + j;
258
        char* line = calloc(strlen(lineTemplate)+128, 1);
S
shenglian zhou 已提交
259
        generateLine(line, strlen(lineTemplate)+128, lineTemplate, protocol, stIdx, ctIdx, ts + l);
260 261 262 263 264 265 266
        int batchNo = l / maxLinesPerBatch;
        int lineNo = l % maxLinesPerBatch;
        allBatches[batchNo][lineNo] =  line;
        argsThread[batchNo % numThreads].batches[batchNo/numThreads].numLines = lineNo + 1;
        ++l;
      }
    }
267 268
  }

269 270
  printf("begin multi-thread insertion...\n");
  int64_t begin = taosGetTimestampUs();
271

272 273 274 275 276 277
  for (int i=0; i < numThreads; ++i) {
    pthread_create(tids+i, NULL, insertLines, argsThread+i);
  }
  for (int i = 0; i < numThreads; ++i) {
    pthread_join(tids[i], NULL);
  }
278 279
  int64_t end = taosGetTimestampUs();

S
shenglian zhou 已提交
280
  size_t linesNum = numSuperTables*numChildTables*numRowsPerChildTable;
281
  printf("TOTAL LINES: %zu\n", linesNum);
282 283 284 285
  printf("THREADS: %d\n", numThreads);
  printf("TIME: %d(ms)\n", (int)(end-begin)/1000);
  double throughput = (double)(totalLines)/(double)(end-begin) * 1000000;
  printf("THROUGHPUT:%d/s\n", (int)throughput);
286 287 288 289 290 291

  for (int i = 0; i < totalBatches; ++i) {
    free(allBatches[i]);
  }
  free(allBatches);

292 293 294
  for (int i = 0; i < numThreads; i++) {
    free(argsThread[i].batches);
  }    
295 296
  free(argsThread);
  free(tids);
297

298
  free(lineTemplate);
299
  taos_close(taos);
S
shenglian zhou 已提交
300 301
  return 0;
}