schemaless.c 8.6 KB
Newer Older
Z
zhaoyanggh 已提交
1
#include "os.h"
S
shenglian zhou 已提交
2 3 4 5 6 7 8 9 10
#include "taos.h"
#include "taoserror.h"

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>

11
#define MAX_THREAD_LINE_BATCHES 1024
S
shenglian zhou 已提交
12

13
void printThreadId(pthread_t id, char* buf)
S
shenglian zhou 已提交
14
{
15 16 17
  size_t i;
  for (i = sizeof(i); i; --i)
    sprintf(buf + strlen(buf), "%02x", *(((unsigned char*) &id) + i - 1));
S
shenglian zhou 已提交
18 19 20 21 22 23 24 25
}

static int64_t getTimeInUs() {
  struct timeval systemTime;
  gettimeofday(&systemTime, NULL);
  return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
}

26
typedef struct {
27 28
  char** lines;
  int numLines;
29 30 31 32 33 34
} SThreadLinesBatch;

typedef struct  {
  TAOS* taos;
  int numBatches;
  SThreadLinesBatch batches[MAX_THREAD_LINE_BATCHES];
35
  int64_t costTime;
G
Ganlin Zhao 已提交
36 37
  int tsPrecision;
  int lineProtocol;
38 39 40 41
} SThreadInsertArgs;

static void* insertLines(void* args) {
  SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args;
G
Ganlin Zhao 已提交
42
  char tidBuf[32] = {0};
43
  printThreadId(pthread_self(), tidBuf);
44 45 46 47
  for (int i = 0; i < insertArgs->numBatches; ++i) {
    SThreadLinesBatch* batch = insertArgs->batches + i;
    printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
    int64_t begin = getTimeInUs();
G
Ganlin Zhao 已提交
48 49
    TAOS_RES *res = taos_schemaless_insert(insertArgs->taos, batch->lines, batch->numLines, insertArgs->lineProtocol, insertArgs->tsPrecision);
    int32_t code = taos_errno(res);
50 51
    int64_t end = getTimeInUs();
    insertArgs->costTime += end - begin;
G
Ganlin Zhao 已提交
52 53
    printf("code: %d, %s. affected lines:%d time used:%"PRId64", thread: 0x%s\n", code, taos_errstr(res), taos_affected_rows(res), end - begin, tidBuf);
    taos_free_result(res);
54
  }
55 56 57
  return NULL;
}

58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
int32_t getLineTemplate(char* lineTemplate, int templateLen, int numFields) {
  if (numFields <= 4) {
    char* sample = "sta%d,t3=%di32 c3=2147483647i32,c4=9223372036854775807i64,c9=11.12345f32,c10=22.123456789f64 %lldms";
    snprintf(lineTemplate, templateLen, "%s", sample);
    return 0;
  }

  if (numFields <= 13) {
     char* sample = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms";
     snprintf(lineTemplate, templateLen, "%s", sample);
     return 0;
  }

  char* lineFormatTable = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32 ";
  snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "%s", lineFormatTable);

  int offset[] = {numFields*2/5, numFields*4/5, numFields};

  for (int i = 0; i < offset[0]; ++i) {
    snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%di32,", i, i);
  }

  for (int i=offset[0]+1; i < offset[1]; ++i) {
    snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=%d.43f64,", i, i);
  }

  for (int i = offset[1]+1; i < offset[2]; ++i) {
    snprintf(lineTemplate+strlen(lineTemplate), templateLen-strlen(lineTemplate), "c%d=\"%d\",", i, i);
  }
  char* lineFormatTs = " %lldms";
  snprintf(lineTemplate+strlen(lineTemplate)-1, templateLen-strlen(lineTemplate)+1, "%s", lineFormatTs);

  return 0;
}

S
shenglian zhou 已提交
93
int main(int argc, char* argv[]) {
94 95 96 97 98 99 100 101
  int numThreads = 8;

  int numSuperTables = 1;
  int numChildTables = 256;
  int numRowsPerChildTable = 8192;
  int numFields = 13;

  int maxLinesPerBatch = 16384;
G
Ganlin Zhao 已提交
102 103
  int tsPrecision = TSDB_SML_TIMESTAMP_NOT_CONFIGURED;
  int lineProtocol = TSDB_SML_UNKNOWN_PROTOCOL;
104 105

  int opt;
G
Ganlin Zhao 已提交
106
  while ((opt = getopt(argc, argv, "s:c:r:f:t:m:p:P:h")) != -1) {
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    switch (opt) {
      case 's':
        numSuperTables = atoi(optarg);
        break;
      case 'c':
        numChildTables = atoi(optarg);
        break;
      case 'r':
        numRowsPerChildTable = atoi(optarg);
        break;
      case 'f':
        numFields = atoi(optarg);
        break;
      case 't':
        numThreads = atoi(optarg);
        break;
      case 'm':
        maxLinesPerBatch = atoi(optarg);
        break;
G
Ganlin Zhao 已提交
126 127 128 129 130 131
      case 'p':
        tsPrecision = atoi(optarg);
        break;
      case 'P':
        lineProtocol = atoi(optarg);
        break;
132 133 134 135 136 137 138 139 140 141 142
      case 'h':
        fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -m maxlines_per_batch\n",
                argv[0]);
        exit(0);
      default: /* '?' */
        fprintf(stderr, "Usage: %s -s supertable -c childtable -r rows -f fields -t threads -m maxlines_per_batch\n",
                argv[0]);
        exit(-1);
    }
  }

Z
zhaoyanggh 已提交
143
  TAOS_RES*   result;
S
shenglian zhou 已提交
144 145 146 147 148 149 150 151 152 153 154
  const char* host = "127.0.0.1";
  const char* user = "root";
  const char* passwd = "taosdata";

  taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
  TAOS* taos = taos_connect(host, user, passwd, "", 0);
  if (taos == NULL) {
    printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos));
    exit(1);
  }

155
  if (numThreads * MAX_THREAD_LINE_BATCHES* maxLinesPerBatch < numSuperTables*numChildTables*numRowsPerChildTable) {
156
    printf("too many rows to be handle by threads with %d batches", MAX_THREAD_LINE_BATCHES);
157 158 159
    exit(2);
  }

S
shenglian zhou 已提交
160 161 162 163 164 165 166
  char* info = taos_get_server_info(taos);
  printf("server info: %s\n", info);
  info = taos_get_client_info(taos);
  printf("client info: %s\n", info);
  result = taos_query(taos, "drop database if exists db;");
  taos_free_result(result);
  usleep(100000);
167
  result = taos_query(taos, "create database db precision 'us';");
S
shenglian zhou 已提交
168 169 170 171 172
  taos_free_result(result);
  usleep(100000);

  (void)taos_select_db(taos, "db");

Z
zhaoyanggh 已提交
173
  time_t  ct = time(0);
S
shenglian zhou 已提交
174
  int64_t ts = ct * 1000;
175 176

  char* lineTemplate = calloc(65536, sizeof(char));
177
  getLineTemplate(lineTemplate, 65535, numFields);
S
shenglian zhou 已提交
178

179
  printf("setup supertables...");
180 181 182
  {
    char** linesStb = calloc(numSuperTables, sizeof(char*));
    for (int i = 0; i < numSuperTables; i++) {
183 184
      char* lineStb = calloc(strlen(lineTemplate)+128, 1);
      snprintf(lineStb, strlen(lineTemplate)+128, lineTemplate, i,
185 186
               numSuperTables * numChildTables,
               ts + numSuperTables * numChildTables * numRowsPerChildTable);
187 188 189 190
      linesStb[i] = lineStb;
    }
    SThreadInsertArgs args = {0};
    args.taos = taos;
191 192
    args.batches[0].lines = linesStb;
    args.batches[0].numLines = numSuperTables;
G
Ganlin Zhao 已提交
193 194
    args.tsPrecision = tsPrecision;
    args.lineProtocol = lineProtocol;
195 196 197 198 199 200 201 202
    insertLines(&args);
    for (int i = 0; i < numSuperTables; ++i) {
      free(linesStb[i]);
    }
    free(linesStb);
  }

  printf("generate lines...\n");
203 204
  pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
  SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
205
  for (int i = 0; i < numThreads; ++i) {
206 207
    argsThread[i].taos = taos;
    argsThread[i].numBatches = 0;
208 209
  }

210 211 212 213
  int64_t totalLines = numSuperTables * numChildTables * numRowsPerChildTable;
  int totalBatches = (int) ((totalLines) / maxLinesPerBatch);
  if (totalLines % maxLinesPerBatch != 0) {
    totalBatches += 1;
S
shenglian zhou 已提交
214 215
  }

216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  char*** allBatches = calloc(totalBatches, sizeof(char**));
  for (int i = 0; i < totalBatches; ++i) {
    allBatches[i] = calloc(maxLinesPerBatch, sizeof(char*));
    int threadNo = i % numThreads;
    int batchNo = i / numThreads;
    argsThread[threadNo].batches[batchNo].lines = allBatches[i];
    argsThread[threadNo].numBatches = batchNo + 1;
  }

  int l = 0;
  for (int i = 0; i < numSuperTables; ++i) {
    for (int j = 0; j < numChildTables; ++j) {
      for (int k = 0; k < numRowsPerChildTable; ++k) {
        int stIdx = i;
        int ctIdx = numSuperTables*numChildTables + j;
231
        char* line = calloc(strlen(lineTemplate)+128, 1);
232
        snprintf(line, strlen(lineTemplate)+128, lineTemplate, stIdx, ctIdx, ts + l);
233 234 235 236 237 238 239
        int batchNo = l / maxLinesPerBatch;
        int lineNo = l % maxLinesPerBatch;
        allBatches[batchNo][lineNo] =  line;
        argsThread[batchNo % numThreads].batches[batchNo/numThreads].numLines = lineNo + 1;
        ++l;
      }
    }
240 241
  }

242 243
  printf("begin multi-thread insertion...\n");
  int64_t begin = taosGetTimestampUs();
244

245 246 247 248 249 250
  for (int i=0; i < numThreads; ++i) {
    pthread_create(tids+i, NULL, insertLines, argsThread+i);
  }
  for (int i = 0; i < numThreads; ++i) {
    pthread_join(tids[i], NULL);
  }
251 252
  int64_t end = taosGetTimestampUs();

S
shenglian zhou 已提交
253
  size_t linesNum = numSuperTables*numChildTables*numRowsPerChildTable;
254
  printf("TOTAL LINES: %zu\n", linesNum);
255 256 257 258
  printf("THREADS: %d\n", numThreads);
  printf("TIME: %d(ms)\n", (int)(end-begin)/1000);
  double throughput = (double)(totalLines)/(double)(end-begin) * 1000000;
  printf("THROUGHPUT:%d/s\n", (int)throughput);
259 260 261 262 263 264

  for (int i = 0; i < totalBatches; ++i) {
    free(allBatches[i]);
  }
  free(allBatches);

265 266
  free(argsThread);
  free(tids);
267

268
  free(lineTemplate);
269
  taos_close(taos);
S
shenglian zhou 已提交
270 271
  return 0;
}