schemaless.c 8.5 KB
Newer Older
Z
zhaoyanggh 已提交
1
#include "os.h"
S
shenglian zhou 已提交
2 3 4 5 6 7 8 9 10
#include "taos.h"
#include "taoserror.h"

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>

11
bool verbose = false;
S
shenglian zhou 已提交
12

13
void printThreadId(pthread_t id, char* buf)
S
shenglian zhou 已提交
14
{
15 16 17
  size_t i;
  for (i = sizeof(i); i; --i)
    sprintf(buf + strlen(buf), "%02x", *(((unsigned char*) &id) + i - 1));
S
shenglian zhou 已提交
18 19 20 21 22 23 24 25
}

static int64_t getTimeInUs() {
  struct timeval systemTime;
  gettimeofday(&systemTime, NULL);
  return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}

26
typedef struct {
27 28
  char** lines;
  int numLines;
29 30 31 32 33
} SThreadLinesBatch;

typedef struct  {
  TAOS* taos;
  int numBatches;
34
  SThreadLinesBatch *batches;
35
  int64_t costTime;
36 37 38 39
} SThreadInsertArgs;

static void* insertLines(void* args) {
  SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args;
G
Ganlin Zhao 已提交
40
  char tidBuf[32] = {0};
41
  printThreadId(pthread_self(), tidBuf);
42 43
  for (int i = 0; i < insertArgs->numBatches; ++i) {
    SThreadLinesBatch* batch = insertArgs->batches + i;
44
    if (verbose) printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
45
    int64_t begin = getTimeInUs();
46 47 48
    //int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines);
     TAOS_RES * res = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS);
     int32_t code = taos_errno(res);
49 50
    int64_t end = getTimeInUs();
    insertArgs->costTime += end - begin;
51
    if (verbose) printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
52
  }
53 54 55
  return NULL;
}

56 57 58 59 60 61 62 63
int32_t getLineTemplate(char* lineTemplate, int templateLen, int numFields) {
  if (numFields <= 4) {
    char* sample = "sta%d,t3=%di32 c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64 %lldms";
    snprintf(lineTemplate, templateLen, "%s", sample);
    return 0;
  }

  if (numFields <= 13) {
64
     char* sample = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lld";
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
     snprintf(lineTemplate, templateLen, "%s", sample);
     return 0;
  }

  char* lineFormatTable = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32 ";
  snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "%s", lineFormatTable);

  int offset[] = {numFields*2/5, numFields*4/5, numFields};

  for (int i = 0; i < offset[0]; ++i) {
    snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%di32,", i, i);
  }

  for (int i=offset[0]+1; i < offset[1]; ++i) {
    snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%d.43f64,", i, i);
  }

  for (int i = offset[1]+1; i < offset[2]; ++i) {
    snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=\"%d\",", i, i);
  }
  char* lineFormatTs = " %lldms";
  snprintf(lineTemplate+strlen(lineTemplate)-1, templateLen-strlen(lineTemplate)+1, "%s", lineFormatTs);

  return 0;
}

S
shenglian zhou 已提交
91
int main(int argc, char* argv[]) {
92
  int numThreads = 8;
93
  int maxBatchesPerThread = 1024;	
94 95 96 97 98 99 100 101 102

  int numSuperTables = 1;
  int numChildTables = 256;
  int numRowsPerChildTable = 8192;
  int numFields = 13;

  int maxLinesPerBatch = 16384;

  int opt;
103
  while ((opt = getopt(argc, argv, "s:c:r:f:t:b:hv")) != -1) {
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
    switch (opt) {
      case 's':
        numSuperTables = atoi(optarg);
        break;
      case 'c':
        numChildTables = atoi(optarg);
        break;
      case 'r':
        numRowsPerChildTable = atoi(optarg);
        break;
      case 'f':
        numFields = atoi(optarg);
        break;
      case 't':
        numThreads = atoi(optarg);
        break;
120
      case 'b':
121 122
        maxLinesPerBatch = atoi(optarg);
        break;
123 124
      case 'v':
        verbose = true;
G
Ganlin Zhao 已提交
125
        break;
126
      case 'h':
127
        fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -v\n",
128 129 130
                argv[0]);
        exit(0);
      default: /* '?' */
131
        fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -b maxlines_per_batch -v\n",
132 133 134 135 136
                argv[0]);
        exit(-1);
    }
  }

Z
zhaoyanggh 已提交
137
  TAOS_RES*   result;
S
shenglian zhou 已提交
138 139 140 141 142 143 144 145 146 147 148
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";

  taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
  TAOS* taos = taos_connect(host, user, passwd, "", 0);
  if (taos == NULL) {
    printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
    exit(1);
  }

149
  maxBatchesPerThread = (numSuperTables*numChildTables*numRowsPerChildTable)/(numThreads * maxLinesPerBatch) + 1;
150

S
shenglian zhou 已提交
151 152 153 154 155 156 157
  char* info = taos_get_server_info(taos);
  printf("server info: %s\n", info);
  info = taos_get_client_info(taos);
  printf("client info: %s\n", info);
  result = taos_query(taos, "drop database if exists db;");
  taos_free_result(result);
  usleep(100000);
158
  result = taos_query(taos, "create database db precision 'us';");
S
shenglian zhou 已提交
159 160 161 162 163
  taos_free_result(result);
  usleep(100000);

  (void)taos_select_db(taos, "db");

Z
zhaoyanggh 已提交
164
  time_t  ct = time(0);
165
  int64_t ts = ct * 1000 ;
166 167

  char* lineTemplate = calloc(65536, sizeof(char));
168
  getLineTemplate(lineTemplate, 65535, numFields);
S
shenglian zhou 已提交
169

170
  printf("setup supertables...");
171 172 173
  {
    char** linesStb = calloc(numSuperTables, sizeof(char*));
    for (int i = 0; i < numSuperTables; i++) {
174 175
      char* lineStb = calloc(strlen(lineTemplate)+128, 1);
      snprintf(lineStb, strlen(lineTemplate)+128, lineTemplate, i,
176 177
               numSuperTables * numChildTables,
               ts + numSuperTables * numChildTables * numRowsPerChildTable);
178 179 180
      linesStb[i] = lineStb;
    }
    SThreadInsertArgs args = {0};
181
    args.batches = calloc(maxBatchesPerThread, sizeof(maxBatchesPerThread));
182
    args.taos = taos;
183 184
    args.batches[0].lines = linesStb;
    args.batches[0].numLines = numSuperTables;
185
    insertLines(&args);
186
    free(args.batches);
187 188 189 190 191 192 193
    for (int i = 0; i < numSuperTables; ++i) {
      free(linesStb[i]);
    }
    free(linesStb);
  }

  printf("generate lines...\n");
194 195
  pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
  SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
196
  for (int i = 0; i < numThreads; ++i) {
197
    argsThread[i].batches = calloc(maxBatchesPerThread, sizeof(SThreadLinesBatch));	  
198 199
    argsThread[i].taos = taos;
    argsThread[i].numBatches = 0;
200 201
  }

202 203 204 205
  int64_t totalLines = numSuperTables * numChildTables * numRowsPerChildTable;
  int totalBatches = (int) ((totalLines) / maxLinesPerBatch);
  if (totalLines % maxLinesPerBatch != 0) {
    totalBatches += 1;
S
shenglian zhou 已提交
206 207
  }

208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
  char*** allBatches = calloc(totalBatches, sizeof(char**));
  for (int i = 0; i < totalBatches; ++i) {
    allBatches[i] = calloc(maxLinesPerBatch, sizeof(char*));
    int threadNo = i % numThreads;
    int batchNo = i / numThreads;
    argsThread[threadNo].batches[batchNo].lines = allBatches[i];
    argsThread[threadNo].numBatches = batchNo + 1;
  }

  int l = 0;
  for (int i = 0; i < numSuperTables; ++i) {
    for (int j = 0; j < numChildTables; ++j) {
      for (int k = 0; k < numRowsPerChildTable; ++k) {
        int stIdx = i;
        int ctIdx = numSuperTables*numChildTables + j;
223
        char* line = calloc(strlen(lineTemplate)+128, 1);
224
        snprintf(line, strlen(lineTemplate)+128, lineTemplate, stIdx, ctIdx, ts + l);
225 226 227 228 229 230 231
        int batchNo = l / maxLinesPerBatch;
        int lineNo = l % maxLinesPerBatch;
        allBatches[batchNo][lineNo] =  line;
        argsThread[batchNo % numThreads].batches[batchNo/numThreads].numLines = lineNo + 1;
        ++l;
      }
    }
232 233
  }

234 235
  printf("begin multi-thread insertion...\n");
  int64_t begin = taosGetTimestampUs();
236

237 238 239 240 241 242
  for (int i=0; i < numThreads; ++i) {
    pthread_create(tids+i, NULL, insertLines, argsThread+i);
  }
  for (int i = 0; i < numThreads; ++i) {
    pthread_join(tids[i], NULL);
  }
243 244
  int64_t end = taosGetTimestampUs();

S
shenglian zhou 已提交
245
  size_t linesNum = numSuperTables*numChildTables*numRowsPerChildTable;
246
  printf("TOTAL LINES: %zu\n", linesNum);
247 248 249 250
  printf("THREADS: %d\n", numThreads);
  printf("TIME: %d(ms)\n", (int)(end-begin)/1000);
  double throughput = (double)(totalLines)/(double)(end-begin) * 1000000;
  printf("THROUGHPUT:%d/s\n", (int)throughput);
251 252 253 254 255 256

  for (int i = 0; i < totalBatches; ++i) {
    free(allBatches[i]);
  }
  free(allBatches);

257 258 259
  for (int i = 0; i < numThreads; i++) {
    free(argsThread[i].batches);
  }    
260 261
  free(argsThread);
  free(tids);
262

263
  free(lineTemplate);
264
  taos_close(taos);
S
shenglian zhou 已提交
265 266
  return 0;
}