queryUtil.c 6.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
Haojun Liao 已提交
17 18
#include "query.h"
#include "tglobal.h"
L
Liu Jicong 已提交
19
#include "tmsg.h"
20
#include "trpc.h"
L
Liu Jicong 已提交
21
#include "tsched.h"
22

L
Liu Jicong 已提交
23 24
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
25 26 27

static struct SSchema _s = {
    .colId = TSDB_TBNAME_COLUMN_INDEX,
L
Liu Jicong 已提交
28
    .type = TSDB_DATA_TYPE_BINARY,
29 30 31 32
    .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
    .name = "tbname",
};

L
Liu Jicong 已提交
33
const SSchema* tGetTbnameColumnSchema() { return &_s; }
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60

static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
  int32_t rowLen = 0;

  for (int32_t i = 0; i < numOfCols; ++i) {
    // 1. valid types
    if (!isValidDataType(pSchema[i].type)) {
      return false;
    }

    // 2. valid length for each type
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY) {
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
        return false;
      }
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
        return false;
      }
    } else {
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
        return false;
      }
    }

    // 3. valid column names
    for (int32_t j = i + 1; j < numOfCols; ++j) {
D
dapan1121 已提交
61
      if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        return false;
      }
    }

    rowLen += pSchema[i].bytes;
  }

  return rowLen <= maxLen;
}

bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags) {
  if (!VALIDNUMOFCOLS(numOfCols)) {
    return false;
  }

  if (!VALIDNUMOFTAGS(numOfTags)) {
    return false;
  }

  /* first column must be the timestamp, which is a primary key */
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
    return false;
  }

  if (!doValidateSchema(pSchema, numOfCols, TSDB_MAX_BYTES_PER_ROW)) {
    return false;
  }

  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
    return false;
  }

  return true;
H
Haojun Liao 已提交
95 96 97 98 99
}

static void* pTaskQueue = NULL;

int32_t initTaskQueue() {
S
Shengliang Guan 已提交
100
  int32_t queueSize = tsMaxShellConns * 2;
S
Shengliang Guan 已提交
101
  pTaskQueue = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc");
H
Haojun Liao 已提交
102 103 104 105 106
  if (NULL == pTaskQueue) {
    qError("failed to init task queue");
    return -1;
  }

S
Shengliang Guan 已提交
107
  qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
L
Liu Jicong 已提交
108
  return 0;
H
Haojun Liao 已提交
109 110 111 112
}

int32_t cleanupTaskQueue() {
  taosCleanUpScheduler(pTaskQueue);
L
Liu Jicong 已提交
113
  return 0;
H
Haojun Liao 已提交
114 115 116 117 118
}

static void execHelper(struct SSchedMsg* pSchedMsg) {
  assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL);

L
Liu Jicong 已提交
119 120
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
  int32_t           code = execFn(pSchedMsg->thandle);
H
Haojun Liao 已提交
121
  if (code != 0 && pSchedMsg->msg != NULL) {
L
Liu Jicong 已提交
122
    *(int32_t*)pSchedMsg->msg = code;
H
Haojun Liao 已提交
123 124 125 126 127 128 129
  }
}

int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
  assert(execFn != NULL);

  SSchedMsg schedMsg = {0};
L
Liu Jicong 已提交
130
  schedMsg.fp = execHelper;
H
Haojun Liao 已提交
131 132
  schedMsg.ahandle = execFn;
  schedMsg.thandle = execParam;
L
Liu Jicong 已提交
133
  schedMsg.msg = code;
H
Haojun Liao 已提交
134 135

  taosScheduleTask(pTaskQueue, &schedMsg);
L
Liu Jicong 已提交
136
  return 0;
H
Haojun Liao 已提交
137
}
138

139 140
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
                                bool persistHandle, void* rpcCtx) {
L
Liu Jicong 已提交
141
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
142
  if (NULL == pMsg) {
L
Liu Jicong 已提交
143
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
144
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
145
    return terrno;
146 147 148
  }

  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
L
Liu Jicong 已提交
149 150 151
  SRpcMsg rpcMsg = {.msgType = pInfo->msgType,
                    .pCont = pMsg,
                    .contLen = pInfo->msgInfo.len,
S
Shengliang Guan 已提交
152 153 154
                    .info.ahandle = (void*)pInfo,
                    .info.handle = pInfo->msgInfo.handle,
                    .info.persistHandle = persistHandle,
L
Liu Jicong 已提交
155
                    .code = 0};
156 157
  assert(pInfo->fp != NULL);

158
  rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
159
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
160
}
D
dapan1121 已提交
161

162 163
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
D
dapan1121 已提交
164 165
}

dengyihao's avatar
dengyihao 已提交
166
char* jobTaskStatusStr(int32_t status) {
D
dapan1121 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
  switch (status) {
    case JOB_TASK_STATUS_NULL:
      return "NULL";
    case JOB_TASK_STATUS_NOT_START:
      return "NOT_START";
    case JOB_TASK_STATUS_EXECUTING:
      return "EXECUTING";
    case JOB_TASK_STATUS_PARTIAL_SUCCEED:
      return "PARTIAL_SUCCEED";
    case JOB_TASK_STATUS_SUCCEED:
      return "SUCCEED";
    case JOB_TASK_STATUS_FAILED:
      return "FAILED";
    case JOB_TASK_STATUS_CANCELLING:
      return "CANCELLING";
    case JOB_TASK_STATUS_CANCELLED:
      return "CANCELLED";
    case JOB_TASK_STATUS_DROPPING:
      return "DROPPING";
    default:
      break;
  }

  return "UNKNOWN";
}
D
dapan1121 已提交
192

193
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
D
dapan1121 已提交
194
  SSchema s = {0};
dengyihao's avatar
dengyihao 已提交
195
  s.type = type;
D
dapan1121 已提交
196 197 198 199 200 201
  s.bytes = bytes;
  s.colId = colId;

  tstrncpy(s.name, name, tListLen(s.name));
  return s;
}
D
dapan1121 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228

void destroyQueryExecRes(SQueryExecRes* pRes) {
  if (NULL == pRes || NULL == pRes->res) {
    return;
  }

  switch (pRes->msgType) {
    case TDMT_VND_ALTER_TABLE:
    case TDMT_MND_ALTER_STB: {
      tFreeSTableMetaRsp((STableMetaRsp *)pRes->res);
      taosMemoryFreeClear(pRes->res);
      break;
    }
    case TDMT_VND_SUBMIT: {
      tFreeSSubmitRsp((SSubmitRsp*)pRes->res);
      break;
    } 
    case TDMT_VND_QUERY: {
      taosArrayDestroy((SArray*)pRes->res);
      break;
    }
    default:
      qError("invalid exec result for request type %d", pRes->msgType);
  }
}