parInsertSml.c 13.7 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "parInsertUtil.h"
#include "parInt.h"
#include "parToken.h"
#include "ttime.h"

wmmhello's avatar
wmmhello 已提交
21 22 23 24 25 26 27 28 29 30
static void clearColValArray(SArray* pCols) {
  int32_t num = taosArrayGetSize(pCols);
  for (int32_t i = 0; i < num; ++i) {
    SColVal* pCol = taosArrayGet(pCols, i);
    if (TSDB_DATA_TYPE_NCHAR == pCol->type) {
      taosMemoryFreeClear(pCol->value.pData);
    }
  }
}

X
Xiaoyu Wang 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
                     int32_t msgBufLen) {
  SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
  SToken  sToken;
  int32_t code = 0;
  char*   tbName = NULL;

  NEXT_TOKEN(pTableName, sToken);

  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = insCreateSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

  if (sToken.n > 0) {
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}

58 59 60 61
static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) {
  bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
  if (NULL == pUseCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
62 63
  }

64 65 66 67
  pBoundInfo->numOfBound = 0;
  int16_t lastColIdx = -1;  // last column found
  int32_t code = TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
68
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
69
    SSmlKv*  kv = taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
70 71
    SToken   sToken = {.n = kv->keyLen, .z = (char*)kv->key};
    col_id_t t = lastColIdx + 1;
72 73
    col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
    uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
X
Xiaoyu Wang 已提交
74 75 76
    if (index < 0 && t > 0) {
      index = insFindCol(&sToken, 0, t, pSchema);
    }
77

X
Xiaoyu Wang 已提交
78 79
    if (index < 0) {
      uError("smlBoundColumnData. index:%d", index);
80 81
      code = TSDB_CODE_SML_INVALID_DATA;
      goto end;
X
Xiaoyu Wang 已提交
82
    }
83
    if (pUseCols[index]) {
X
Xiaoyu Wang 已提交
84
      uError("smlBoundColumnData. already set. index:%d", index);
85 86
      code = TSDB_CODE_SML_INVALID_DATA;
      goto end;
X
Xiaoyu Wang 已提交
87 88
    }
    lastColIdx = index;
89 90 91
    pUseCols[index] = true;
    pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
    ++pBoundInfo->numOfBound;
X
Xiaoyu Wang 已提交
92 93
  }

94 95
end:
  taosMemoryFree(pUseCols);
X
Xiaoyu Wang 已提交
96

97
  return code;
X
Xiaoyu Wang 已提交
98 99 100 101 102 103 104 105 106 107 108 109
}

/**
 * @brief No json tag for schemaless
 *
 * @param cols
 * @param tags
 * @param pSchema
 * @param ppTag
 * @param msg
 * @return int32_t
 */
110
static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName,
X
Xiaoyu Wang 已提交
111 112 113
                              SMsgBuf* msg) {
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
S
Shengliang Guan 已提交
114
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
115 116 117
  }
  *tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
  if (!*tagName) {
S
Shengliang Guan 已提交
118
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
119 120 121 122
  }

  int32_t code = TSDB_CODE_SUCCESS;
  for (int i = 0; i < tags->numOfBound; ++i) {
123
    SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
124
    SSmlKv*  kv = taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170

    taosArrayPush(*tagName, pTagSchema->name);
    STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
    //    strcpy(val.colName, pTagSchema->name);
    if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
      val.pData = (uint8_t*)kv->value;
      val.nData = kv->length;
    } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t output = 0;
      void*   p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
      if (p == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto end;
      }
      if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output)) {
        if (errno == E2BIG) {
          taosMemoryFree(p);
          code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
          goto end;
        }
        char buf[512] = {0};
        snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
        taosMemoryFree(p);
        code = buildSyntaxErrMsg(msg, buf, kv->value);
        goto end;
      }
      val.pData = p;
      val.nData = output;
    } else {
      memcpy(&val.i64, &(kv->value), kv->length);
    }
    taosArrayPush(pTagArray, &val);
  }

  code = tTagNew(pTagArray, 1, false, ppTag);
end:
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
      taosMemoryFree(p->pData);
    }
  }
  taosArrayDestroy(pTagArray);
  return code;
}

X
Xiaoyu Wang 已提交
171
STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta) {
172
  STableDataCxt* pTableCxt = NULL;
X
Xiaoyu Wang 已提交
173 174 175
  SVCreateTbReq* pCreateTbReq = NULL;
  int            ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
                                          sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, &pTableCxt, false);
176 177 178 179 180 181 182 183 184 185 186
  if (ret != TSDB_CODE_SUCCESS) {
    return NULL;
  }

  ret = initTableColSubmitData(pTableCxt);
  if (ret != TSDB_CODE_SUCCESS) {
    return NULL;
  }
  return pTableCxt;
}

X
Xiaoyu Wang 已提交
187
int32_t smlBuildRow(STableDataCxt* pTableCxt) {
188
  SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
X
Xiaoyu Wang 已提交
189
  int    ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
190 191 192 193 194 195 196
  if (TSDB_CODE_SUCCESS != ret) {
    return ret;
  }
  insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
197 198
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32_t index) {
  int      ret = TSDB_CODE_SUCCESS;
199 200
  SSchema* pColSchema = schema + index;
  SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
X
Xiaoyu Wang 已提交
201
  SSmlKv*  kv = (SSmlKv*)data;
wmmhello's avatar
wmmhello 已提交
202
  if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){
wmmhello's avatar
wmmhello 已提交
203 204 205
    ret = TSDB_CODE_SML_INVALID_DATA;
    goto end;
  }
X
Xiaoyu Wang 已提交
206
  if (kv->type == TSDB_DATA_TYPE_NCHAR) {
207
    int32_t len = 0;
wmmhello's avatar
wmmhello 已提交
208 209 210 211 212 213
    int64_t size = pColSchema->bytes - VARSTR_HEADER_SIZE;
    if(size <= 0){
      ret = TSDB_CODE_SML_INVALID_DATA;
      goto end;
    }
    char*   pUcs4 = taosMemoryCalloc(1, size);
214 215 216 217
    if (NULL == pUcs4) {
      ret = TSDB_CODE_OUT_OF_MEMORY;
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
218
    if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, size, &len)) {
219
      if (errno == E2BIG) {
wmmhello's avatar
wmmhello 已提交
220
        taosMemoryFree(pUcs4);
221 222 223
        ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
224
      taosMemoryFree(pUcs4);
225 226 227 228 229
      ret = TSDB_CODE_TSC_INVALID_VALUE;
      goto end;
    }
    pVal->value.pData = pUcs4;
    pVal->value.nData = len;
X
Xiaoyu Wang 已提交
230
  } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
231
    pVal->value.nData = kv->length;
X
Xiaoyu Wang 已提交
232
    pVal->value.pData = (uint8_t*)kv->value;
233 234 235 236 237 238 239 240 241
  } else {
    memcpy(&pVal->value.val, &(kv->value), kv->length);
  }
  pVal->flag = CV_FLAG_VALUE;

end:
  return ret;
}

X
Xiaoyu Wang 已提交
242 243 244
int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols,
                    STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
                    char* msgBuf, int16_t msgBufLen) {
X
Xiaoyu Wang 已提交
245 246
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};

247 248 249 250
  SSchema*       pTagsSchema = getTableTagSchema(pTableMeta);
  SBoundColInfo  bindTags = {0};
  SVCreateTbReq* pCreateTblReq = NULL;
  SArray*        tagName = NULL;
251 252 253

  insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags);
  int ret = smlBoundColumnData(tags, &bindTags, pTagsSchema, true);
X
Xiaoyu Wang 已提交
254 255
  if (ret != TSDB_CODE_SUCCESS) {
    buildInvalidOperationMsg(&pBuf, "bound tags error");
256
    goto end;
X
Xiaoyu Wang 已提交
257
  }
258

259
  STag* pTag = NULL;
260 261

  ret = smlBuildTagRow(tags, &bindTags, pTagsSchema, &pTag, &tagName, &pBuf);
X
Xiaoyu Wang 已提交
262
  if (ret != TSDB_CODE_SUCCESS) {
263
    goto end;
X
Xiaoyu Wang 已提交
264 265
  }

266 267 268 269 270
  pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
  if (NULL == pCreateTblReq) {
    ret = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
271 272
  insBuildCreateTbReq(pCreateTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags,
                      ttl);
X
Xiaoyu Wang 已提交
273

274 275
  pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1);
  memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
X
Xiaoyu Wang 已提交
276

X
Xiaoyu Wang 已提交
277 278 279
  if (dataFormat) {
    STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj,
                                                             &pTableMeta->uid, sizeof(pTableMeta->uid));
280 281 282 283 284 285
    if (NULL == pTableCxt) {
      ret = buildInvalidOperationMsg(&pBuf, "dataformat true. get tableDataCtx error");
      goto end;
    }
    (*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
    (*pTableCxt)->pData->pCreateTbReq = pCreateTblReq;
286 287
    (*pTableCxt)->pMeta->uid = pTableMeta->uid;
    (*pTableCxt)->pMeta->vgId = pTableMeta->vgId;
288 289 290 291
    pCreateTblReq = NULL;
    goto end;
  }

292
  STableDataCxt* pTableCxt = NULL;
X
Xiaoyu Wang 已提交
293
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
294
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false);
X
Xiaoyu Wang 已提交
295
  if (ret != TSDB_CODE_SUCCESS) {
296 297
    buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error");
    goto end;
X
Xiaoyu Wang 已提交
298 299 300
  }

  SSchema* pSchema = getTableColumnSchema(pTableMeta);
301
  ret = smlBoundColumnData(colsSchema, &pTableCxt->boundColsInfo, pSchema, false);
X
Xiaoyu Wang 已提交
302 303
  if (ret != TSDB_CODE_SUCCESS) {
    buildInvalidOperationMsg(&pBuf, "bound cols error");
304
    goto end;
X
Xiaoyu Wang 已提交
305 306
  }

307 308 309 310 311
  ret = initTableColSubmitData(pTableCxt);
  if (ret != TSDB_CODE_SUCCESS) {
    buildInvalidOperationMsg(&pBuf, "initTableColSubmitData error");
    goto end;
  }
X
Xiaoyu Wang 已提交
312 313 314

  int32_t rowNum = taosArrayGetSize(cols);
  if (rowNum <= 0) {
315 316
    ret = buildInvalidOperationMsg(&pBuf, "cols size <= 0");
    goto end;
X
Xiaoyu Wang 已提交
317
  }
318

X
Xiaoyu Wang 已提交
319
  for (int32_t r = 0; r < rowNum; ++r) {
320
    void* rowData = taosArrayGetP(cols, r);
X
Xiaoyu Wang 已提交
321 322

    // 1. set the parsed value from sql string
323 324 325
    for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) {
      SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
      SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
X
Xiaoyu Wang 已提交
326
      void**   p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
327 328 329
      if (p == NULL) {
        continue;
      }
X
Xiaoyu Wang 已提交
330
      SSmlKv* kv = *(SSmlKv**)p;
wmmhello's avatar
wmmhello 已提交
331 332 333 334
      if(kv->type != pColSchema->type){
        ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type");
        goto end;
      }
335 336 337
      if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
        kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
      }
338
      if (kv->type == TSDB_DATA_TYPE_NCHAR) {
339
        int32_t len = 0;
wmmhello's avatar
wmmhello 已提交
340
        char*   pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE);
341 342 343
        if (NULL == pUcs4) {
          ret = TSDB_CODE_OUT_OF_MEMORY;
          goto end;
X
Xiaoyu Wang 已提交
344
        }
wmmhello's avatar
wmmhello 已提交
345
        if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
346 347 348 349 350 351 352
          if (errno == E2BIG) {
            buildInvalidOperationMsg(&pBuf, "value too long");
            ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
            goto end;
          }
          ret = buildInvalidOperationMsg(&pBuf, strerror(errno));
          goto end;
X
Xiaoyu Wang 已提交
353
        }
354 355
        pVal->value.pData = pUcs4;
        pVal->value.nData = len;
356 357 358
      } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
        pVal->value.nData = kv->length;
        pVal->value.pData = (uint8_t*)kv->value;
X
Xiaoyu Wang 已提交
359
      } else {
360
        memcpy(&pVal->value.val, &(kv->value), kv->length);
X
Xiaoyu Wang 已提交
361
      }
362
      pVal->flag = CV_FLAG_VALUE;
X
Xiaoyu Wang 已提交
363
    }
wmmhello's avatar
wmmhello 已提交
364 365 366 367 368 369 370 371

    SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
    ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
    if (TSDB_CODE_SUCCESS != ret) {
      buildInvalidOperationMsg(&pBuf, "tRowBuild error");
      goto end;
    }
    insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
wmmhello's avatar
wmmhello 已提交
372
    clearColValArray(pTableCxt->pValues);
X
Xiaoyu Wang 已提交
373 374
  }

375
end:
X
Xiaoyu Wang 已提交
376
  insDestroyBoundColInfo(&bindTags);
wmmhello's avatar
wmmhello 已提交
377 378
  tdDestroySVCreateTbReq(pCreateTblReq);
  taosMemoryFree(pCreateTblReq);
379 380
  taosArrayDestroy(tagName);
  return ret;
X
Xiaoyu Wang 已提交
381 382
}

wmmhello's avatar
wmmhello 已提交
383
SQuery* smlInitHandle() {
384
  SQuery* pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
wmmhello's avatar
wmmhello 已提交
385 386
  if (NULL == pQuery) {
    uError("create pQuery error");
387
    return NULL;
wmmhello's avatar
wmmhello 已提交
388 389 390 391
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
X
Xiaoyu Wang 已提交
392
  SVnodeModifyOpStmt* stmt = (SVnodeModifyOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT);
wmmhello's avatar
wmmhello 已提交
393
  if (NULL == stmt) {
X
Xiaoyu Wang 已提交
394
    uError("create SVnodeModifyOpStmt error");
wmmhello's avatar
wmmhello 已提交
395 396 397
    qDestroyQuery(pQuery);
    return NULL;
  }
398
  stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
399 400
  stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
  stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
X
Xiaoyu Wang 已提交
401

402
  pQuery->pRoot = (SNode*)stmt;
wmmhello's avatar
wmmhello 已提交
403
  return pQuery;
X
Xiaoyu Wang 已提交
404 405
}

406
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) {
X
Xiaoyu Wang 已提交
407
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot;
wmmhello's avatar
wmmhello 已提交
408 409 410 411 412 413 414 415 416 417 418 419
  // merge according to vgId
  int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks);
  if (code != TSDB_CODE_SUCCESS) {
    uError("insMergeTableDataCxt failed");
    return code;
  }
  code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
  if (code != TSDB_CODE_SUCCESS) {
    uError("insBuildVgDataBlocks failed");
    return code;
  }
  return code;
X
Xiaoyu Wang 已提交
420
}