runUdf.c 3.8 KB
Newer Older
S
shenglian zhou 已提交
1 2 3 4
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "uv.h"
5 6

#include "fnLog.h"
wafwerar's avatar
wafwerar 已提交
7
#include "os.h"
S
shenglian zhou 已提交
8
#include "tdatablock.h"
9 10
#include "tglobal.h"
#include "tudf.h"
S
shenglian zhou 已提交
11

12 13 14 15 16 17 18 19 20 21 22 23 24
static int32_t parseArgs(int32_t argc, char *argv[]) {
  for (int32_t i = 1; i < argc; ++i) {
    if (strcmp(argv[i], "-c") == 0) {
      if (i < argc - 1) {
        if (strlen(argv[++i]) >= PATH_MAX) {
          printf("config file path overflow");
          return -1;
        }
        tstrncpy(configDir, argv[i], PATH_MAX);
      } else {
        printf("'-c' requires a parameter, default is %s\n", configDir);
        return -1;
      }
S
shenglian zhou 已提交
25
    }
26
  }
S
shenglian zhou 已提交
27

28 29
  return 0;
}
S
shenglian zhou 已提交
30

31 32 33
static int32_t initLog() {
  char logName[12] = {0};
  snprintf(logName, sizeof(logName), "%slog", "udfc");
wafwerar's avatar
wafwerar 已提交
34
  return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
35 36
}

S
slzhou 已提交
37
int scalarFuncTest() {
38 39
  UdfcFuncHandle handle;

S
slzhou 已提交
40 41 42 43
  if (doSetupUdf("udf1", &handle) != 0) {
    fnError("setup udf failure");
    return -1;
  }
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
  int64_t beg = taosGetTimestampUs();
  for (int k = 0; k < 1; ++k) {
    SSDataBlock  block = {0};
    SSDataBlock *pBlock = &block;
    pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
    pBlock->info.numOfCols = 1;
    pBlock->info.rows = 1024;
    for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
      SColumnInfoData colInfo = {0};
      colInfo.info.type = TSDB_DATA_TYPE_INT;
      colInfo.info.bytes = sizeof(int32_t);
      colInfo.info.colId = 1;
      colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows);
      for (int32_t j = 0; j < pBlock->info.rows; ++j) {
        colDataAppendInt32(&colInfo, j, &j);
      }
      taosArrayPush(pBlock->pDataBlock, &colInfo);
S
slzhou 已提交
61
    }
62

63 64 65 66 67 68 69 70 71 72 73 74 75
    SScalarParam input = {0};
    input.numOfRows = pBlock->info.rows;
    input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
    SScalarParam output = {0};
    doCallUdfScalarFunc(handle, &input, 1, &output);
    taosArrayDestroy(pBlock->pDataBlock);
    SColumnInfoData *col = output.columnData;
    for (int32_t i = 0; i < output.numOfRows; ++i) {
      if (i % 100 == 0)
        fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
    }
    colDataDestroy(output.columnData);
    taosMemoryFree(output.columnData);
76
  }
77 78
  int64_t end = taosGetTimestampUs();
  fprintf(stderr, "time: %f\n", (end-beg)/1000.0);
79
  doTeardownUdf(handle);
S
slzhou 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95

  return 0;
}

int aggregateFuncTest() {
  UdfcFuncHandle handle;

  if (doSetupUdf("udf2", &handle) != 0) {
    fnError("setup udf failure");
    return -1;
  }

  SSDataBlock  block = {0};
  SSDataBlock *pBlock = &block;
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
  pBlock->info.numOfCols = 1;
96
  pBlock->info.rows = 1024;
S
slzhou 已提交
97 98 99 100 101
  for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
    SColumnInfoData colInfo = {0};
    colInfo.info.type = TSDB_DATA_TYPE_INT;
    colInfo.info.bytes = sizeof(int32_t);
    colInfo.info.colId = 1;
102
    colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows);
S
slzhou 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
    for (int32_t j = 0; j < pBlock->info.rows; ++j) {
      colDataAppendInt32(&colInfo, j, &j);
    }
    taosArrayPush(pBlock->pDataBlock, &colInfo);
  }

  SUdfInterBuf buf = {0};
  SUdfInterBuf newBuf = {0};
  SUdfInterBuf resultBuf = {0};
  doCallUdfAggInit(handle, &buf);
  doCallUdfAggProcess(handle, pBlock, &buf, &newBuf);
  taosArrayDestroy(pBlock->pDataBlock);

  doCallUdfAggFinalize(handle, &newBuf, &resultBuf);
  fprintf(stderr, "agg result: %f\n", *(double*)resultBuf.buf);

  freeUdfInterBuf(&buf);
  freeUdfInterBuf(&newBuf);
  freeUdfInterBuf(&resultBuf);
  doTeardownUdf(handle);

  return 0;
}

int main(int argc, char *argv[]) {
  parseArgs(argc, argv);
  initLog();
  if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
    fnError("failed to start since read config error");
    return -1;
  }

  udfcOpen();
  uv_sleep(1000);

  scalarFuncTest();
  aggregateFuncTest();
140
  udfcClose();
S
shenglian zhou 已提交
141
}