parInsertSml.c 13.7 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "parInsertUtil.h"
#include "parInt.h"
#include "parToken.h"
#include "ttime.h"

wmmhello's avatar
wmmhello 已提交
21 22 23 24 25 26 27 28 29 30
static void clearColValArray(SArray* pCols) {
  int32_t num = taosArrayGetSize(pCols);
  for (int32_t i = 0; i < num; ++i) {
    SColVal* pCol = taosArrayGet(pCols, i);
    if (TSDB_DATA_TYPE_NCHAR == pCol->type) {
      taosMemoryFreeClear(pCol->value.pData);
    }
  }
}

X
Xiaoyu Wang 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
                     int32_t msgBufLen) {
  SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
  SToken  sToken;
  int32_t code = 0;
  char*   tbName = NULL;

  NEXT_TOKEN(pTableName, sToken);

  if (sToken.n == 0) {
    return buildInvalidOperationMsg(&msg, "empty table name");
  }

  code = insCreateSName(pName, &sToken, acctId, dbName, &msg);
  if (code) {
    return code;
  }

  NEXT_TOKEN(pTableName, sToken);

  if (sToken.n > 0) {
    return buildInvalidOperationMsg(&msg, "table name format is wrong");
  }

  return TSDB_CODE_SUCCESS;
}

58 59 60 61
static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) {
  bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
  if (NULL == pUseCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
62 63
  }

64 65 66 67
  pBoundInfo->numOfBound = 0;
  int16_t lastColIdx = -1;  // last column found
  int32_t code = TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
68
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
69
    SSmlKv*  kv = taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
70 71
    SToken   sToken = {.n = kv->keyLen, .z = (char*)kv->key};
    col_id_t t = lastColIdx + 1;
72 73
    col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
    uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
X
Xiaoyu Wang 已提交
74 75 76
    if (index < 0 && t > 0) {
      index = insFindCol(&sToken, 0, t, pSchema);
    }
77

X
Xiaoyu Wang 已提交
78 79
    if (index < 0) {
      uError("smlBoundColumnData. index:%d", index);
80 81
      code = TSDB_CODE_SML_INVALID_DATA;
      goto end;
X
Xiaoyu Wang 已提交
82
    }
83
    if (pUseCols[index]) {
X
Xiaoyu Wang 已提交
84
      uError("smlBoundColumnData. already set. index:%d", index);
85 86
      code = TSDB_CODE_SML_INVALID_DATA;
      goto end;
X
Xiaoyu Wang 已提交
87 88
    }
    lastColIdx = index;
89 90 91
    pUseCols[index] = true;
    pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
    ++pBoundInfo->numOfBound;
X
Xiaoyu Wang 已提交
92 93
  }

94 95
end:
  taosMemoryFree(pUseCols);
X
Xiaoyu Wang 已提交
96

97
  return code;
X
Xiaoyu Wang 已提交
98 99 100 101 102 103 104 105 106 107 108 109
}

/**
 * @brief No json tag for schemaless
 *
 * @param cols
 * @param tags
 * @param pSchema
 * @param ppTag
 * @param msg
 * @return int32_t
 */
110
static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName,
X
Xiaoyu Wang 已提交
111 112 113
                              SMsgBuf* msg) {
  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
S
Shengliang Guan 已提交
114
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
115 116 117
  }
  *tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
  if (!*tagName) {
S
Shengliang Guan 已提交
118
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
119 120 121 122
  }

  int32_t code = TSDB_CODE_SUCCESS;
  for (int i = 0; i < tags->numOfBound; ++i) {
123
    SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
124
    SSmlKv*  kv = taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170

    taosArrayPush(*tagName, pTagSchema->name);
    STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
    //    strcpy(val.colName, pTagSchema->name);
    if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
      val.pData = (uint8_t*)kv->value;
      val.nData = kv->length;
    } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
      int32_t output = 0;
      void*   p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
      if (p == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto end;
      }
      if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output)) {
        if (errno == E2BIG) {
          taosMemoryFree(p);
          code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
          goto end;
        }
        char buf[512] = {0};
        snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
        taosMemoryFree(p);
        code = buildSyntaxErrMsg(msg, buf, kv->value);
        goto end;
      }
      val.pData = p;
      val.nData = output;
    } else {
      memcpy(&val.i64, &(kv->value), kv->length);
    }
    taosArrayPush(pTagArray, &val);
  }

  code = tTagNew(pTagArray, 1, false, ppTag);
end:
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
      taosMemoryFree(p->pData);
    }
  }
  taosArrayDestroy(pTagArray);
  return code;
}

X
Xiaoyu Wang 已提交
171
STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta) {
172
  STableDataCxt* pTableCxt = NULL;
X
Xiaoyu Wang 已提交
173 174 175
  SVCreateTbReq* pCreateTbReq = NULL;
  int            ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
                                          sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, &pTableCxt, false);
176 177 178 179 180 181 182 183 184 185 186
  if (ret != TSDB_CODE_SUCCESS) {
    return NULL;
  }

  ret = initTableColSubmitData(pTableCxt);
  if (ret != TSDB_CODE_SUCCESS) {
    return NULL;
  }
  return pTableCxt;
}

X
Xiaoyu Wang 已提交
187
int32_t smlBuildRow(STableDataCxt* pTableCxt) {
188
  SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
X
Xiaoyu Wang 已提交
189
  int    ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
190 191 192 193 194 195 196
  if (TSDB_CODE_SUCCESS != ret) {
    return ret;
  }
  insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
197 198
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32_t index) {
  int      ret = TSDB_CODE_SUCCESS;
199 200
  SSchema* pColSchema = schema + index;
  SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
X
Xiaoyu Wang 已提交
201
  SSmlKv*  kv = (SSmlKv*)data;
wmmhello's avatar
wmmhello 已提交
202
  if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){
wmmhello's avatar
wmmhello 已提交
203
    ret = TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
204
    uError("SML smlBuildCol error col not same %s", pColSchema->name);
wmmhello's avatar
wmmhello 已提交
205 206
    goto end;
  }
X
Xiaoyu Wang 已提交
207
  if (kv->type == TSDB_DATA_TYPE_NCHAR) {
208
    int32_t len = 0;
wmmhello's avatar
wmmhello 已提交
209 210 211 212 213 214
    int64_t size = pColSchema->bytes - VARSTR_HEADER_SIZE;
    if(size <= 0){
      ret = TSDB_CODE_SML_INVALID_DATA;
      goto end;
    }
    char*   pUcs4 = taosMemoryCalloc(1, size);
215 216 217 218
    if (NULL == pUcs4) {
      ret = TSDB_CODE_OUT_OF_MEMORY;
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
219
    if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, size, &len)) {
220
      if (errno == E2BIG) {
wmmhello's avatar
wmmhello 已提交
221
        taosMemoryFree(pUcs4);
222 223 224
        ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
225
      taosMemoryFree(pUcs4);
226 227 228 229 230
      ret = TSDB_CODE_TSC_INVALID_VALUE;
      goto end;
    }
    pVal->value.pData = pUcs4;
    pVal->value.nData = len;
X
Xiaoyu Wang 已提交
231
  } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
232
    pVal->value.nData = kv->length;
X
Xiaoyu Wang 已提交
233
    pVal->value.pData = (uint8_t*)kv->value;
234 235 236 237 238 239 240 241 242
  } else {
    memcpy(&pVal->value.val, &(kv->value), kv->length);
  }
  pVal->flag = CV_FLAG_VALUE;

end:
  return ret;
}

X
Xiaoyu Wang 已提交
243 244 245
int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols,
                    STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
                    char* msgBuf, int16_t msgBufLen) {
X
Xiaoyu Wang 已提交
246 247
  SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};

248 249 250 251
  SSchema*       pTagsSchema = getTableTagSchema(pTableMeta);
  SBoundColInfo  bindTags = {0};
  SVCreateTbReq* pCreateTblReq = NULL;
  SArray*        tagName = NULL;
252 253 254

  insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags);
  int ret = smlBoundColumnData(tags, &bindTags, pTagsSchema, true);
X
Xiaoyu Wang 已提交
255 256
  if (ret != TSDB_CODE_SUCCESS) {
    buildInvalidOperationMsg(&pBuf, "bound tags error");
257
    goto end;
X
Xiaoyu Wang 已提交
258
  }
259

260
  STag* pTag = NULL;
261 262

  ret = smlBuildTagRow(tags, &bindTags, pTagsSchema, &pTag, &tagName, &pBuf);
X
Xiaoyu Wang 已提交
263
  if (ret != TSDB_CODE_SUCCESS) {
264
    goto end;
X
Xiaoyu Wang 已提交
265 266
  }

267 268 269 270 271
  pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
  if (NULL == pCreateTblReq) {
    ret = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
272 273
  insBuildCreateTbReq(pCreateTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags,
                      ttl);
X
Xiaoyu Wang 已提交
274

275 276
  pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1);
  memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
X
Xiaoyu Wang 已提交
277

X
Xiaoyu Wang 已提交
278 279 280
  if (dataFormat) {
    STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj,
                                                             &pTableMeta->uid, sizeof(pTableMeta->uid));
281 282 283 284 285 286
    if (NULL == pTableCxt) {
      ret = buildInvalidOperationMsg(&pBuf, "dataformat true. get tableDataCtx error");
      goto end;
    }
    (*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
    (*pTableCxt)->pData->pCreateTbReq = pCreateTblReq;
287 288
    (*pTableCxt)->pMeta->uid = pTableMeta->uid;
    (*pTableCxt)->pMeta->vgId = pTableMeta->vgId;
289 290 291 292
    pCreateTblReq = NULL;
    goto end;
  }

293
  STableDataCxt* pTableCxt = NULL;
X
Xiaoyu Wang 已提交
294
  ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
295
                           sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false);
X
Xiaoyu Wang 已提交
296
  if (ret != TSDB_CODE_SUCCESS) {
297 298
    buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error");
    goto end;
X
Xiaoyu Wang 已提交
299 300 301
  }

  SSchema* pSchema = getTableColumnSchema(pTableMeta);
302
  ret = smlBoundColumnData(colsSchema, &pTableCxt->boundColsInfo, pSchema, false);
X
Xiaoyu Wang 已提交
303 304
  if (ret != TSDB_CODE_SUCCESS) {
    buildInvalidOperationMsg(&pBuf, "bound cols error");
305
    goto end;
X
Xiaoyu Wang 已提交
306 307
  }

308 309 310 311 312
  ret = initTableColSubmitData(pTableCxt);
  if (ret != TSDB_CODE_SUCCESS) {
    buildInvalidOperationMsg(&pBuf, "initTableColSubmitData error");
    goto end;
  }
X
Xiaoyu Wang 已提交
313 314 315

  int32_t rowNum = taosArrayGetSize(cols);
  if (rowNum <= 0) {
316 317
    ret = buildInvalidOperationMsg(&pBuf, "cols size <= 0");
    goto end;
X
Xiaoyu Wang 已提交
318
  }
319

X
Xiaoyu Wang 已提交
320
  for (int32_t r = 0; r < rowNum; ++r) {
321
    void* rowData = taosArrayGetP(cols, r);
X
Xiaoyu Wang 已提交
322 323

    // 1. set the parsed value from sql string
324 325 326
    for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) {
      SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
      SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
X
Xiaoyu Wang 已提交
327
      void**   p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
328 329 330
      if (p == NULL) {
        continue;
      }
X
Xiaoyu Wang 已提交
331
      SSmlKv* kv = *(SSmlKv**)p;
wmmhello's avatar
wmmhello 已提交
332 333 334 335
      if(kv->type != pColSchema->type){
        ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type");
        goto end;
      }
336 337 338
      if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
        kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
      }
339
      if (kv->type == TSDB_DATA_TYPE_NCHAR) {
340
        int32_t len = 0;
wmmhello's avatar
wmmhello 已提交
341
        char*   pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE);
342 343 344
        if (NULL == pUcs4) {
          ret = TSDB_CODE_OUT_OF_MEMORY;
          goto end;
X
Xiaoyu Wang 已提交
345
        }
wmmhello's avatar
wmmhello 已提交
346
        if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
347 348 349 350 351 352 353
          if (errno == E2BIG) {
            buildInvalidOperationMsg(&pBuf, "value too long");
            ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
            goto end;
          }
          ret = buildInvalidOperationMsg(&pBuf, strerror(errno));
          goto end;
X
Xiaoyu Wang 已提交
354
        }
355 356
        pVal->value.pData = pUcs4;
        pVal->value.nData = len;
357 358 359
      } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
        pVal->value.nData = kv->length;
        pVal->value.pData = (uint8_t*)kv->value;
X
Xiaoyu Wang 已提交
360
      } else {
361
        memcpy(&pVal->value.val, &(kv->value), kv->length);
X
Xiaoyu Wang 已提交
362
      }
363
      pVal->flag = CV_FLAG_VALUE;
X
Xiaoyu Wang 已提交
364
    }
wmmhello's avatar
wmmhello 已提交
365 366 367 368 369 370 371 372

    SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
    ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
    if (TSDB_CODE_SUCCESS != ret) {
      buildInvalidOperationMsg(&pBuf, "tRowBuild error");
      goto end;
    }
    insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
wmmhello's avatar
wmmhello 已提交
373
    clearColValArray(pTableCxt->pValues);
X
Xiaoyu Wang 已提交
374 375
  }

376
end:
X
Xiaoyu Wang 已提交
377
  insDestroyBoundColInfo(&bindTags);
wmmhello's avatar
wmmhello 已提交
378 379
  tdDestroySVCreateTbReq(pCreateTblReq);
  taosMemoryFree(pCreateTblReq);
380 381
  taosArrayDestroy(tagName);
  return ret;
X
Xiaoyu Wang 已提交
382 383
}

wmmhello's avatar
wmmhello 已提交
384
SQuery* smlInitHandle() {
385
  SQuery* pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
wmmhello's avatar
wmmhello 已提交
386 387
  if (NULL == pQuery) {
    uError("create pQuery error");
388
    return NULL;
wmmhello's avatar
wmmhello 已提交
389 390 391 392
  }
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
  pQuery->haveResultSet = false;
  pQuery->msgType = TDMT_VND_SUBMIT;
X
Xiaoyu Wang 已提交
393
  SVnodeModifyOpStmt* stmt = (SVnodeModifyOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT);
wmmhello's avatar
wmmhello 已提交
394
  if (NULL == stmt) {
X
Xiaoyu Wang 已提交
395
    uError("create SVnodeModifyOpStmt error");
wmmhello's avatar
wmmhello 已提交
396 397 398
    qDestroyQuery(pQuery);
    return NULL;
  }
399
  stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
400 401
  stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
  stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
X
Xiaoyu Wang 已提交
402

403
  pQuery->pRoot = (SNode*)stmt;
wmmhello's avatar
wmmhello 已提交
404
  return pQuery;
X
Xiaoyu Wang 已提交
405 406
}

407
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) {
X
Xiaoyu Wang 已提交
408
  SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot;
wmmhello's avatar
wmmhello 已提交
409 410 411 412 413 414 415 416 417 418 419 420
  // merge according to vgId
  int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks);
  if (code != TSDB_CODE_SUCCESS) {
    uError("insMergeTableDataCxt failed");
    return code;
  }
  code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
  if (code != TSDB_CODE_SUCCESS) {
    uError("insBuildVgDataBlocks failed");
    return code;
  }
  return code;
X
Xiaoyu Wang 已提交
421
}