clientStmt.c 32.4 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5

#include "clientInt.h"
#include "clientLog.h"
#include "tdef.h"

X
Xiaoyu Wang 已提交
6 7
#include "clientStmt.h"

dengyihao's avatar
dengyihao 已提交
8 9
char* gStmtStatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
                          "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
10

11
static int32_t stmtCreateRequest(STscStmt* pStmt) {
D
dapan1121 已提交
12
  int32_t code = 0;
H
Hongze Cheng 已提交
13 14

  if (pStmt->exec.pRequest == NULL) {
dengyihao's avatar
dengyihao 已提交
15 16
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
                        pStmt->reqid);
dengyihao's avatar
dengyihao 已提交
17
    if (pStmt->reqid != 0) {
dengyihao's avatar
dengyihao 已提交
18 19
      pStmt->reqid++;
    }
D
dapan1121 已提交
20 21 22
    if (TSDB_CODE_SUCCESS == code) {
      pStmt->exec.pRequest->syncQuery = true;
    }
23
  }
D
dapan1121 已提交
24 25

  return code;
26 27
}

D
stmt  
dapan1121 已提交
28
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
D
dapan1121 已提交
29
  int32_t code = 0;
X
Xiaoyu Wang 已提交
30

31 32 33 34
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
    STMT_LOG_SEQ(newStatus);
  }

D
stmt  
dapan1121 已提交
35
  switch (newStatus) {
D
dapan1121 已提交
36 37
    case STMT_PREPARE:
      break;
D
stmt  
dapan1121 已提交
38
    case STMT_SETTBNAME:
D
dapan1121 已提交
39
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL)) {
D
dapan1121 已提交
40 41 42 43
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_SETTAGS:
D
dapan1121 已提交
44
      if (STMT_STATUS_NE(SETTBNAME) && STMT_STATUS_NE(FETCH_FIELDS)) {
D
dapan1121 已提交
45 46 47 48 49 50 51 52 53 54 55 56
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_FETCH_FIELDS:
      if (STMT_STATUS_EQ(INIT)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_BIND:
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
X
Xiaoyu Wang 已提交
57 58 59 60 61
      /*
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
              code = TSDB_CODE_TSC_STMT_API_ERROR;
            }
      */
D
dapan1121 已提交
62 63 64 65 66 67 68 69 70 71
      break;
    case STMT_BIND_COL:
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_ADD_BATCH:
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
D
stmt  
dapan1121 已提交
72
      break;
D
dapan1121 已提交
73
    case STMT_EXECUTE:
D
dapan1121 已提交
74
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
X
Xiaoyu Wang 已提交
75 76
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
            STMT_STATUS_NE(BIND_COL)) {
D
dapan1121 已提交
77 78 79 80 81 82
          code = TSDB_CODE_TSC_STMT_API_ERROR;
        }
      } else {
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
          code = TSDB_CODE_TSC_STMT_API_ERROR;
        }
D
dapan1121 已提交
83
      }
D
dapan1121 已提交
84
      break;
D
stmt  
dapan1121 已提交
85
    default:
D
dapan1121 已提交
86
      code = TSDB_CODE_TSC_APP_ERROR;
D
stmt  
dapan1121 已提交
87 88 89
      break;
  }

D
dapan1121 已提交
90
  STMT_ERR_RET(code);
D
stmt  
dapan1121 已提交
91 92 93 94 95 96

  pStmt->sql.status = newStatus;

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
97
int32_t stmtGetTbName(TAOS_STMT* stmt, char** tbName) {
D
stmt  
dapan1121 已提交
98 99
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
100
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
101

D
dapan 已提交
102
  if ('\0' == pStmt->bInfo.tbName[0]) {
D
stmt  
dapan1121 已提交
103 104 105 106
    tscError("no table name set");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
  }

D
stmt  
dapan1121 已提交
107
  *tbName = pStmt->bInfo.tbName;
D
stmt  
dapan1121 已提交
108 109 110 111

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
112
int32_t stmtBackupQueryFields(STscStmt* pStmt) {
X
Xiaoyu Wang 已提交
113
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
D
dapan1121 已提交
114 115
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
X
Xiaoyu Wang 已提交
116

D
dapan1121 已提交
117 118 119 120
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
  pRes->fields = taosMemoryMalloc(size);
  pRes->userFields = taosMemoryMalloc(size);
  if (NULL == pRes->fields || NULL == pRes->userFields) {
D
dapan1121 已提交
121 122
    STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
123 124 125 126 127 128 129
  memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
  memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);

  return TSDB_CODE_SUCCESS;
}

int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
X
Xiaoyu Wang 已提交
130 131 132
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);

D
dapan1121 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;

  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
      STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
    }
    memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
  }

  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
      STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
    }
    memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
  }
D
dapan1121 已提交
151 152 153 154

  return TSDB_CODE_SUCCESS;
}

155
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, const char* sTableName, bool autoCreateTbl) {
D
stmt  
dapan1121 已提交
156 157
  STscStmt* pStmt = (STscStmt*)stmt;

D
dapan 已提交
158 159
  strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1);
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
160

161
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
D
stmt  
dapan1121 已提交
162 163 164
  pStmt->bInfo.tbSuid = pTableMeta->suid;
  pStmt->bInfo.tbType = pTableMeta->tableType;
  pStmt->bInfo.boundTags = tags;
D
dapan1121 已提交
165
  pStmt->bInfo.tagsCached = false;
D
dapan1121 已提交
166
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
D
stmt  
dapan1121 已提交
167 168 169 170

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
171
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) {
D
stmt  
dapan1121 已提交
172 173
  STscStmt* pStmt = (STscStmt*)stmt;

D
dapan1121 已提交
174
  pStmt->sql.pVgHash = pVgHash;
D
stmt  
dapan1121 已提交
175
  pStmt->exec.pBlockHash = pBlockHash;
D
dapan 已提交
176
  pStmt->exec.autoCreateTbl = autoCreateTbl;
D
stmt  
dapan1121 已提交
177 178 179 180

  return TSDB_CODE_SUCCESS;
}

181
int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, bool autoCreateTbl,
wmmhello's avatar
wmmhello 已提交
182
                       SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName) {
D
dapan 已提交
183 184
  STscStmt* pStmt = (STscStmt*)stmt;

185
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName, sTableName, autoCreateTbl));
D
dapan 已提交
186
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl));
D
dapan 已提交
187

D
dapan 已提交
188
  pStmt->sql.autoCreateTbl = autoCreateTbl;
189

D
dapan 已提交
190 191 192
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
193 194 195
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
dapan1121 已提交
196
  *pVgHash = pStmt->sql.pVgHash;
D
stmt  
dapan1121 已提交
197
  *pBlockHash = pStmt->exec.pBlockHash;
D
stmt  
dapan1121 已提交
198 199 200 201

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
202
int32_t stmtCacheBlock(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
203 204 205
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
    return TSDB_CODE_SUCCESS;
  }
D
stmt  
dapan1121 已提交
206

207
  uint64_t uid = pStmt->bInfo.tbUid;
D
dapan 已提交
208
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
D
stmt  
dapan1121 已提交
209

D
dapan 已提交
210
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
D
stmt  
dapan1121 已提交
211 212 213
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
214
  STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
dengyihao's avatar
dengyihao 已提交
215
  if (!pSrc) {
wmmhello's avatar
wmmhello 已提交
216 217
    return TSDB_CODE_OUT_OF_MEMORY;
  }
dengyihao's avatar
dengyihao 已提交
218
  STableDataBlocks* pDst = NULL;
219

D
stmt  
dapan1121 已提交
220
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
D
stmt  
dapan1121 已提交
221 222

  SStmtTableCache cache = {
X
Xiaoyu Wang 已提交
223 224
      .pDataBlock = pDst,
      .boundTags = pStmt->bInfo.boundTags,
D
stmt  
dapan1121 已提交
225 226
  };

D
dapan 已提交
227
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
D
stmt  
dapan1121 已提交
228 229
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
stmt  
dapan1121 已提交
230

D
dapan 已提交
231 232 233 234 235
  if (pStmt->sql.autoCreateTbl) {
    pStmt->bInfo.tagsCached = true;
  } else {
    pStmt->bInfo.boundTags = NULL;
  }
236

D
stmt  
dapan1121 已提交
237 238 239
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
240 241
int32_t stmtParseSql(STscStmt* pStmt) {
  SStmtCallback stmtCb = {
242 243 244 245
      .pStmt = pStmt,
      .getTbNameFn = stmtGetTbName,
      .setInfoFn = stmtUpdateInfo,
      .getExecInfoFn = stmtGetExecInfo,
D
stmt  
dapan1121 已提交
246
  };
D
stmt  
dapan1121 已提交
247

248
  STMT_ERR_RET(stmtCreateRequest(pStmt));
H
Hongze Cheng 已提交
249

D
stmt  
dapan1121 已提交
250 251
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));

D
stmt  
dapan1121 已提交
252
  pStmt->bInfo.needParse = false;
X
Xiaoyu Wang 已提交
253

D
dapan1121 已提交
254 255 256 257 258 259
  if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
    pStmt->sql.type = STMT_TYPE_INSERT;
  } else if (pStmt->sql.pQuery->pPrepareRoot) {
    pStmt->sql.type = STMT_TYPE_QUERY;
  }

D
stmt  
dapan1121 已提交
260 261
  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
262

D
stmt  
dapan1121 已提交
263
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
264 265 266 267
  pStmt->bInfo.tbUid = 0;
  pStmt->bInfo.tbSuid = 0;
  pStmt->bInfo.tbType = 0;
  pStmt->bInfo.needParse = true;
D
dapan 已提交
268
  pStmt->bInfo.inExecCache = false;
D
stmt  
dapan1121 已提交
269

D
dapan 已提交
270 271
  pStmt->bInfo.tbName[0] = 0;
  pStmt->bInfo.tbFName[0] = 0;
D
dapan1121 已提交
272 273 274 275
  if (!pStmt->bInfo.tagsCached) {
    destroyBoundColumnInfo(pStmt->bInfo.boundTags);
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
  }
wmmhello's avatar
wmmhello 已提交
276
  memset(pStmt->bInfo.stbFName, 0, TSDB_TABLE_FNAME_LEN);
D
stmt  
dapan1121 已提交
277
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
278 279
}

D
dapan1121 已提交
280 281
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
  if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
D
dapan1121 已提交
282 283 284
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
D
stmt  
dapan1121 已提交
285

D
dapan 已提交
286
  size_t keyLen = 0;
287
  void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
D
stmt  
dapan1121 已提交
288
  while (pIter) {
289 290 291 292
    STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;
    char*             key = taosHashGetKey(pIter, &keyLen);
    STableMeta*       pMeta = qGetTableMetaInDataBlock(pBlocks);

D
dapan 已提交
293
    if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) {
D
stmt  
dapan1121 已提交
294
      STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
X
Xiaoyu Wang 已提交
295

D
stmt  
dapan1121 已提交
296 297 298 299
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
      continue;
    }

D
dapan1121 已提交
300 301 302 303 304
    if (STMT_TYPE_MULTI_INSERT == pStmt->sql.type) {
      qFreeStmtDataBlock(pBlocks);
    } else {
      qDestroyStmtDataBlock(pBlocks);
    }
D
dapan 已提交
305
    taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
D
stmt  
dapan1121 已提交
306 307 308 309

    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
  }

D
dapan 已提交
310
  pStmt->exec.autoCreateTbl = false;
311

D
stmt  
dapan1121 已提交
312 313 314 315 316 317
  if (keepTable) {
    return TSDB_CODE_SUCCESS;
  }

  taosHashCleanup(pStmt->exec.pBlockHash);
  pStmt->exec.pBlockHash = NULL;
D
stmt  
dapan1121 已提交
318 319 320 321 322 323 324

  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

  return TSDB_CODE_SUCCESS;
}

int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
D
dapan1121 已提交
325 326
  taosMemoryFree(pStmt->sql.queryRes.fields);
  taosMemoryFree(pStmt->sql.queryRes.userFields);
D
stmt  
dapan1121 已提交
327 328
  taosMemoryFree(pStmt->sql.sqlStr);
  qDestroyQuery(pStmt->sql.pQuery);
D
dapan1121 已提交
329
  taosArrayDestroy(pStmt->sql.nodeList);
D
dapan1121 已提交
330 331
  taosHashCleanup(pStmt->sql.pVgHash);
  pStmt->sql.pVgHash = NULL;
X
Xiaoyu Wang 已提交
332 333

  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
D
stmt  
dapan1121 已提交
334
  while (pIter) {
X
Xiaoyu Wang 已提交
335
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
D
stmt  
dapan1121 已提交
336

D
stmt  
dapan1121 已提交
337
    qDestroyStmtDataBlock(pCache->pDataBlock);
D
stmt  
dapan1121 已提交
338
    destroyBoundColumnInfo(pCache->boundTags);
D
dapan1121 已提交
339
    taosMemoryFreeClear(pCache->boundTags);
X
Xiaoyu Wang 已提交
340

D
stmt  
dapan1121 已提交
341 342 343 344 345
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
  }
  taosHashCleanup(pStmt->sql.pTableCache);
  pStmt->sql.pTableCache = NULL;

D
dapan1121 已提交
346
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
D
stmt  
dapan1121 已提交
347
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
D
stmt  
dapan1121 已提交
348

D
dapan1121 已提交
349 350
  memset(&pStmt->sql, 0, sizeof(pStmt->sql));

D
stmt  
dapan1121 已提交
351 352 353
  return TSDB_CODE_SUCCESS;
}

354
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STableDataBlocks** newBlock, uint64_t uid) {
355 356 357
  SEpSet           ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
  SVgroupInfo      vgInfo = {0};
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
D
dapan1121 已提交
358 359 360
                           .requestId = pStmt->exec.pRequest->requestId,
                           .requestObjRefId = pStmt->exec.pRequest->self,
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
361

D
dapan1121 已提交
362
  STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo));
363
  STMT_ERR_RET(
D
dapan1121 已提交
364
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
365

D
dapan 已提交
366 367 368 369 370
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId));

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
371
int32_t stmtGetFromCache(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
372
  pStmt->bInfo.needParse = true;
D
dapan 已提交
373
  pStmt->bInfo.inExecCache = false;
374 375 376

  STableDataBlocks* pBlockInExec =
      taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
dapan 已提交
377
  if (pBlockInExec) {
D
dapan1121 已提交
378
    pStmt->bInfo.needParse = false;
D
dapan 已提交
379 380 381
    pStmt->bInfo.inExecCache = true;

    if (pStmt->sql.autoCreateTbl) {
D
dapan1121 已提交
382
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
D
dapan 已提交
383 384 385
      return TSDB_CODE_SUCCESS;
    }
  }
D
stmt  
dapan1121 已提交
386

D
stmt  
dapan1121 已提交
387
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
D
dapan 已提交
388 389 390
    if (pStmt->bInfo.inExecCache) {
      ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1);
      pStmt->bInfo.needParse = false;
D
dapan1121 已提交
391
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
D
dapan 已提交
392 393
      return TSDB_CODE_SUCCESS;
    }
394

D
dapan1121 已提交
395
    tscDebug("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
D
stmt  
dapan1121 已提交
396 397 398
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
399 400 401 402
  if (NULL == pStmt->pCatalog) {
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
  }

D
dapan 已提交
403 404 405 406
  if (pStmt->sql.autoCreateTbl) {
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
    if (pCache) {
      pStmt->bInfo.needParse = false;
D
dapan1121 已提交
407 408 409
      pStmt->exec.autoCreateTbl = true;

      pStmt->bInfo.tbUid = 0;
410

D
dapan 已提交
411
      STableDataBlocks* pNewBlock = NULL;
D
dapan 已提交
412
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, 0));
413 414 415

      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
                      POINTER_BYTES)) {
D
dapan 已提交
416 417
        STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
418

H
Hongze Cheng 已提交
419
      tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
D
dapan1121 已提交
420

D
dapan 已提交
421 422
      return TSDB_CODE_SUCCESS;
    }
423

D
dapan 已提交
424 425
    STMT_RET(stmtCleanBindInfo(pStmt));
  }
D
stmt  
dapan1121 已提交
426

427 428
  STableMeta*      pTableMeta = NULL;
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
D
dapan1121 已提交
429 430 431
                           .requestId = pStmt->exec.pRequest->requestId,
                           .requestObjRefId = pStmt->exec.pRequest->self,
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
432
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
D
dapan1121 已提交
433 434
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
    STMT_ERR_RET(stmtCleanBindInfo(pStmt));
X
Xiaoyu Wang 已提交
435

D
dapan1121 已提交
436 437
    tscDebug("tb %s not exist", pStmt->bInfo.tbFName);

D
dapan1121 已提交
438 439 440 441
    return TSDB_CODE_SUCCESS;
  }

  STMT_ERR_RET(code);
X
Xiaoyu Wang 已提交
442

D
dapan1121 已提交
443 444
  uint64_t uid = pTableMeta->uid;
  uint64_t suid = pTableMeta->suid;
X
Xiaoyu Wang 已提交
445
  int8_t   tableType = pTableMeta->tableType;
D
dapan1121 已提交
446
  taosMemoryFree(pTableMeta);
D
dapan 已提交
447
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
448

D
dapan1121 已提交
449
  if (uid == pStmt->bInfo.tbUid) {
D
stmt  
dapan1121 已提交
450
    pStmt->bInfo.needParse = false;
D
dapan1121 已提交
451

D
dapan1121 已提交
452 453
    tscDebug("tb %s is current table", pStmt->bInfo.tbFName);

D
stmt  
dapan1121 已提交
454 455 456
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
457
  if (pStmt->bInfo.inExecCache) {
D
dapan 已提交
458
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
D
stmt  
dapan1121 已提交
459
    if (NULL == pCache) {
460 461 462
      tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
               pStmt->bInfo.tbFName, uid, cacheUid);

D
stmt  
dapan1121 已提交
463 464
      STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
    }
X
Xiaoyu Wang 已提交
465

D
stmt  
dapan1121 已提交
466
    pStmt->bInfo.needParse = false;
X
Xiaoyu Wang 已提交
467

D
dapan1121 已提交
468 469 470
    pStmt->bInfo.tbUid = uid;
    pStmt->bInfo.tbSuid = suid;
    pStmt->bInfo.tbType = tableType;
D
stmt  
dapan1121 已提交
471
    pStmt->bInfo.boundTags = pCache->boundTags;
D
dapan1121 已提交
472
    pStmt->bInfo.tagsCached = true;
D
dapan1121 已提交
473

D
dapan1121 已提交
474 475
    tscDebug("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);

D
stmt  
dapan1121 已提交
476 477 478
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
479
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
D
stmt  
dapan1121 已提交
480
  if (pCache) {
D
stmt  
dapan1121 已提交
481
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
482

D
dapan1121 已提交
483 484 485
    pStmt->bInfo.tbUid = uid;
    pStmt->bInfo.tbSuid = suid;
    pStmt->bInfo.tbType = tableType;
D
stmt  
dapan1121 已提交
486
    pStmt->bInfo.boundTags = pCache->boundTags;
D
dapan1121 已提交
487
    pStmt->bInfo.tagsCached = true;
D
stmt  
dapan1121 已提交
488

D
stmt  
dapan1121 已提交
489
    STableDataBlocks* pNewBlock = NULL;
D
dapan 已提交
490
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, uid));
D
stmt  
dapan1121 已提交
491

492 493
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
                    POINTER_BYTES)) {
D
stmt  
dapan1121 已提交
494 495
      STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
X
Xiaoyu Wang 已提交
496

D
dapan1121 已提交
497 498
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);

D
stmt  
dapan1121 已提交
499 500 501
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
502 503
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

D
stmt  
dapan1121 已提交
504 505 506
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
507 508 509 510 511 512 513 514 515 516 517 518 519 520
int32_t stmtResetStmt(STscStmt* pStmt) {
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));

  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    STMT_ERR_RET(terrno);
  }

  pStmt->sql.status = STMT_INIT;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
521
TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) {
X
Xiaoyu Wang 已提交
522
  STscObj*  pObj = (STscObj*)taos;
D
dapan1121 已提交
523 524 525
  STscStmt* pStmt = NULL;

  pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
D
stmt  
dapan1121 已提交
526
  if (NULL == pStmt) {
D
dapan1121 已提交
527 528 529
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
D
stmt  
dapan1121 已提交
530

D
stmt  
dapan1121 已提交
531 532
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
D
stmt  
dapan1121 已提交
533 534 535 536
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    taosMemoryFree(pStmt);
    return NULL;
  }
D
stmt  
dapan1121 已提交
537

D
dapan1121 已提交
538
  pStmt->taos = pObj;
D
stmt  
dapan1121 已提交
539
  pStmt->bInfo.needParse = true;
D
stmt  
dapan1121 已提交
540
  pStmt->sql.status = STMT_INIT;
dengyihao's avatar
dengyihao 已提交
541
  pStmt->reqid = reqid;
X
Xiaoyu Wang 已提交
542

543
  STMT_LOG_SEQ(STMT_INIT);
dengyihao's avatar
dengyihao 已提交
544

545 546
  tscDebug("stmt:%p initialized", pStmt);

D
stmt  
dapan1121 已提交
547 548
  return pStmt;
}
D
dapan1121 已提交
549

X
Xiaoyu Wang 已提交
550
int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
D
stmt  
dapan1121 已提交
551 552
  STscStmt* pStmt = (STscStmt*)stmt;

553
  STMT_DLOG_E("start to prepare");
D
dapan1121 已提交
554

D
stmt  
dapan1121 已提交
555
  if (pStmt->sql.status >= STMT_PREPARE) {
D
stmt  
dapan1121 已提交
556
    STMT_ERR_RET(stmtResetStmt(pStmt));
D
stmt  
dapan1121 已提交
557 558
  }

D
stmt  
dapan1121 已提交
559 560 561 562 563
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));

  if (length <= 0) {
    length = strlen(sql);
  }
X
Xiaoyu Wang 已提交
564

D
stmt  
dapan1121 已提交
565 566
  pStmt->sql.sqlStr = strndup(sql, length);
  pStmt->sql.sqlLen = length;
D
stmt  
dapan1121 已提交
567 568 569 570

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
571
int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
D
stmt  
dapan1121 已提交
572 573
  STscStmt* pStmt = (STscStmt*)stmt;

574
  STMT_DLOG("start to set tbName: %s", tbName);
D
dapan1121 已提交
575

D
stmt  
dapan1121 已提交
576
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
D
stmt  
dapan1121 已提交
577

D
dapan1121 已提交
578 579 580 581 582 583 584
  int32_t insert = 0;
  stmtIsInsert(stmt, &insert);
  if (0 == insert) {
    tscError("set tb name not available for none insert statement");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

585
  STMT_ERR_RET(stmtCreateRequest(pStmt));
586 587 588

  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
                            pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
D
dapan 已提交
589
  tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName);
590

D
stmt  
dapan1121 已提交
591
  STMT_ERR_RET(stmtGetFromCache(pStmt));
D
stmt  
dapan1121 已提交
592

D
stmt  
dapan1121 已提交
593
  if (pStmt->bInfo.needParse) {
D
dapan 已提交
594 595
    strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
    pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
D
dapan1121 已提交
596 597

    STMT_ERR_RET(stmtParseSql(pStmt));
D
stmt  
dapan1121 已提交
598 599
  }

D
stmt  
dapan1121 已提交
600 601 602
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
603
int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
D
stmt  
dapan1121 已提交
604
  STscStmt* pStmt = (STscStmt*)stmt;
D
dapan1121 已提交
605

606
  STMT_DLOG_E("start to set tbTags");
D
dapan1121 已提交
607

D
stmt  
dapan1121 已提交
608
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
D
stmt  
dapan1121 已提交
609

D
dapan 已提交
610 611 612
  if (pStmt->bInfo.inExecCache) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan 已提交
613

614 615
  STableDataBlocks** pDataBlock =
      (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
stmt  
dapan1121 已提交
616
  if (NULL == pDataBlock) {
D
dapan 已提交
617
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
D
stmt  
dapan1121 已提交
618 619
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }
X
Xiaoyu Wang 已提交
620

D
dapan1121 已提交
621
  tscDebug("start to bind stmt tag values");
H
Hongze Cheng 已提交
622 623 624
  STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
                                  pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
                                  pStmt->exec.pRequest->msgBufLen));
D
stmt  
dapan1121 已提交
625

626 627
  pStmt->exec.autoCreateTbl = true;

D
stmt  
dapan1121 已提交
628 629 630
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
631
int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
D
stmt  
dapan1121 已提交
632 633 634 635 636
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query tag fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

637 638
  STableDataBlocks** pDataBlock =
      (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
stmt  
dapan1121 已提交
639
  if (NULL == pDataBlock) {
D
dapan 已提交
640
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
D
stmt  
dapan1121 已提交
641 642 643
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
stmt  
dapan1121 已提交
644
  STMT_ERR_RET(qBuildStmtTagFields(*pDataBlock, pStmt->bInfo.boundTags, fieldNum, fields));
D
stmt  
dapan1121 已提交
645

D
stmt  
dapan1121 已提交
646 647
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
648

D
dapan1121 已提交
649
int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
D
stmt  
dapan1121 已提交
650 651 652 653 654
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query column fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

655 656
  STableDataBlocks** pDataBlock =
      (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
stmt  
dapan1121 已提交
657
  if (NULL == pDataBlock) {
D
dapan 已提交
658
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
D
stmt  
dapan1121 已提交
659 660 661
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
stmt  
dapan1121 已提交
662
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
D
stmt  
dapan1121 已提交
663

X
Xiaoyu Wang 已提交
664
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
665 666
}

X
Xiaoyu Wang 已提交
667
int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
D
stmt  
dapan1121 已提交
668 669
  STscStmt* pStmt = (STscStmt*)stmt;

670
  STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx);
D
dapan1121 已提交
671

D
dapan1121 已提交
672 673
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));

X
Xiaoyu Wang 已提交
674 675
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
D
stmt  
dapan1121 已提交
676
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
677
  }
D
stmt  
dapan1121 已提交
678

D
dapan1121 已提交
679 680 681 682
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
X
Xiaoyu Wang 已提交
683

684
  STMT_ERR_RET(stmtCreateRequest(pStmt));
D
stmt  
dapan1121 已提交
685

D
stmt  
dapan1121 已提交
686
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
687 688
    STMT_ERR_RET(stmtParseSql(pStmt));
  }
D
stmt  
dapan1121 已提交
689

D
dapan1121 已提交
690
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
691 692
    STMT_ERR_RET(qStmtBindParams(pStmt->sql.pQuery, bind, colIdx));

D
dapan1121 已提交
693 694 695 696 697 698 699 700 701 702 703
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
                         .acctId = pStmt->taos->acctId,
                         .db = pStmt->exec.pRequest->pDb,
                         .topicQuery = false,
                         .pSql = pStmt->sql.sqlStr,
                         .sqlLen = pStmt->sql.sqlLen,
                         .pMsg = pStmt->exec.pRequest->msgBuf,
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
                         .pStmtCb = NULL,
                         .pUser = pStmt->taos->user};
D
dapan1121 已提交
704 705
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
706

D
dapan1121 已提交
707
    STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
X
Xiaoyu Wang 已提交
708

D
dapan1121 已提交
709
    if (pStmt->sql.pQuery->haveResultSet) {
710 711
      setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
                       pStmt->sql.pQuery->numOfResCols);
D
dapan1121 已提交
712
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
D
dapan1121 已提交
713 714
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
    }
715

D
dapan1121 已提交
716
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
717
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
D
dapan1121 已提交
718
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
D
dapan1121 已提交
719

720 721 722
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
    // }
D
dapan1121 已提交
723

724
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
D
dapan1121 已提交
725

D
dapan1121 已提交
726
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
727
  }
728 729 730

  STableDataBlocks** pDataBlock =
      (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
stmt  
dapan1121 已提交
731
  if (NULL == pDataBlock) {
D
dapan 已提交
732
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
D
stmt  
dapan1121 已提交
733 734
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }
D
stmt  
dapan1121 已提交
735 736

  if (colIdx < 0) {
D
dapan1121 已提交
737 738 739 740 741
    int32_t code = qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
    if (code) {
      tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
      STMT_ERR_RET(code);
    }
D
stmt  
dapan1121 已提交
742 743 744 745 746 747 748
  } else {
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
      tscError("bind column index not in sequence");
      STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
    }

    pStmt->bInfo.sBindLastIdx = colIdx;
X
Xiaoyu Wang 已提交
749

D
stmt  
dapan1121 已提交
750 751 752
    if (0 == colIdx) {
      pStmt->bInfo.sBindRowNum = bind->num;
    }
X
Xiaoyu Wang 已提交
753 754 755

    qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx,
                            pStmt->bInfo.sBindRowNum);
D
stmt  
dapan1121 已提交
756
  }
X
Xiaoyu Wang 已提交
757

D
stmt  
dapan1121 已提交
758
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
759 760
}

X
Xiaoyu Wang 已提交
761
int stmtAddBatch(TAOS_STMT* stmt) {
D
stmt  
dapan1121 已提交
762 763
  STscStmt* pStmt = (STscStmt*)stmt;

764
  STMT_DLOG_E("start to add batch");
D
dapan1121 已提交
765

D
stmt  
dapan1121 已提交
766
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
D
stmt  
dapan1121 已提交
767

D
stmt  
dapan1121 已提交
768
  STMT_ERR_RET(stmtCacheBlock(pStmt));
X
Xiaoyu Wang 已提交
769

D
stmt  
dapan1121 已提交
770 771 772
  return TSDB_CODE_SUCCESS;
}

773
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
D
dapan1121 已提交
774
  tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks);
D
dapan1121 已提交
775

776 777
  size_t             keyLen = 0;
  STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
D
dapan 已提交
778
  while (pIter) {
779 780 781 782
    STableDataBlocks* pBlock = *pIter;
    char*             key = taosHashGetKey(pIter, &keyLen);

    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
D
dapan 已提交
783 784 785 786 787
    if (pMeta->uid) {
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
      continue;
    }

788 789
    SSubmitBlkRsp* blkRsp = NULL;
    int32_t        i = 0;
D
dapan 已提交
790 791 792 793 794
    for (; i < pRsp->nBlocks; ++i) {
      blkRsp = pRsp->pBlocks + i;
      if (strlen(blkRsp->tblFName) != keyLen) {
        continue;
      }
795

D
dapan 已提交
796 797 798
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
        continue;
      }
799

D
dapan 已提交
800 801 802 803
      break;
    }

    if (i < pRsp->nBlocks) {
804 805 806
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
               blkRsp->uid);

D
dapan 已提交
807 808 809
      pMeta->uid = blkRsp->uid;
      pStmt->bInfo.tbUid = blkRsp->uid;
    } else {
810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
      if (NULL == pStmt->pCatalog) {
        STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
      }

      STMT_ERR_RET(stmtCreateRequest(pStmt));

      STableMeta*      pTableMeta = NULL;
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
                               .requestId = pStmt->exec.pRequest->requestId,
                               .requestObjRefId = pStmt->exec.pRequest->self,
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
      int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);

      taos_free_result(pStmt->exec.pRequest);
      pStmt->exec.pRequest = NULL;
      
      if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
        tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
        return TSDB_CODE_SUCCESS;
      }

      pMeta->uid = pTableMeta->uid;
      pStmt->bInfo.tbUid = pTableMeta->uid;
D
dapan 已提交
834 835 836 837 838 839 840 841
    }

    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
  }

  return TSDB_CODE_SUCCESS;
}

842 843 844 845 846
int stmtExec(TAOS_STMT* stmt) {
  STscStmt*   pStmt = (STscStmt*)stmt;
  int32_t     code = 0;
  SSubmitRsp* pRsp = NULL;
  bool        autoCreateTbl = pStmt->exec.autoCreateTbl;
D
stmt  
dapan1121 已提交
847

848
  STMT_DLOG_E("start to exec");
D
dapan1121 已提交
849

D
stmt  
dapan1121 已提交
850
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
D
stmt  
dapan1121 已提交
851

D
dapan1121 已提交
852
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
853
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
D
dapan1121 已提交
854
  } else {
D
dapan1121 已提交
855
    STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
856
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, (autoCreateTbl ? (void**)&pRsp : NULL));
D
dapan1121 已提交
857
  }
D
fix bug  
dapan1121 已提交
858 859 860 861 862 863

  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
    if (code) {
      pStmt->exec.pRequest->code = code;
    } else {
D
dapan1121 已提交
864
      tFreeSSubmitRsp(pRsp);
D
fix bug  
dapan1121 已提交
865 866 867 868
      STMT_ERR_RET(stmtResetStmt(pStmt));
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
    }
  }
X
Xiaoyu Wang 已提交
869

D
stmt  
dapan1121 已提交
870
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
D
stmt  
dapan1121 已提交
871

D
dapan1121 已提交
872 873
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
  pStmt->affectedRows += pStmt->exec.affectedRows;
D
stmt  
dapan1121 已提交
874

D
stmt  
dapan1121 已提交
875 876
_return:

D
dapan1121 已提交
877
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
D
dapan 已提交
878 879 880 881

  if (TSDB_CODE_SUCCESS == code && autoCreateTbl) {
    if (NULL == pRsp) {
      tscError("no submit resp got for auto create table");
D
dapan1121 已提交
882 883 884
      code = TSDB_CODE_TSC_APP_ERROR;
    } else {
      code = stmtUpdateTableUid(pStmt, pRsp);
D
dapan 已提交
885 886
    }
  }
X
Xiaoyu Wang 已提交
887

D
dapan1121 已提交
888
  tFreeSSubmitRsp(pRsp);
889

D
stmt  
dapan1121 已提交
890
  ++pStmt->sql.runTimes;
X
Xiaoyu Wang 已提交
891

D
stmt  
dapan1121 已提交
892
  STMT_RET(code);
D
stmt  
dapan1121 已提交
893 894
}

X
Xiaoyu Wang 已提交
895
int stmtClose(TAOS_STMT* stmt) {
D
stmt  
dapan1121 已提交
896 897
  STscStmt* pStmt = (STscStmt*)stmt;

D
dapan1121 已提交
898
  stmtCleanSQLInfo(pStmt);
D
dapan1121 已提交
899
  taosMemoryFree(stmt);
D
dapan1121 已提交
900 901

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
902 903
}

X
Xiaoyu Wang 已提交
904
const char* stmtErrstr(TAOS_STMT* stmt) {
D
stmt  
dapan1121 已提交
905
  STscStmt* pStmt = (STscStmt*)stmt;
D
stmt  
dapan1121 已提交
906

D
fix bug  
dapan1121 已提交
907
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
X
Xiaoyu Wang 已提交
908
    return (char*)tstrerror(terrno);
D
stmt  
dapan1121 已提交
909 910
  }

D
fix bug  
dapan1121 已提交
911
  pStmt->exec.pRequest->code = terrno;
D
stmt  
dapan1121 已提交
912

D
stmt  
dapan1121 已提交
913
  return taos_errstr(pStmt->exec.pRequest);
D
stmt  
dapan1121 已提交
914 915
}

X
Xiaoyu Wang 已提交
916
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt*)stmt)->affectedRows; }
D
stmt  
dapan1121 已提交
917

X
Xiaoyu Wang 已提交
918
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt*)stmt)->exec.affectedRows; }
D
dapan1121 已提交
919

X
Xiaoyu Wang 已提交
920
int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
D
stmt  
dapan1121 已提交
921 922
  STscStmt* pStmt = (STscStmt*)stmt;

923 924
  STMT_DLOG_E("start is insert");

D
stmt  
dapan1121 已提交
925 926 927
  if (pStmt->sql.type) {
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
  } else {
928
    *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
D
stmt  
dapan1121 已提交
929
  }
X
Xiaoyu Wang 已提交
930

D
stmt  
dapan1121 已提交
931 932 933
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
934 935 936
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
  STscStmt* pStmt = (STscStmt*)stmt;

937 938
  STMT_DLOG_E("start to get tag fields");

D
dapan1121 已提交
939 940 941
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }
942

D
dapan1121 已提交
943 944 945 946 947 948 949 950 951 952 953 954
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));

  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
    pStmt->bInfo.needParse = false;
  }

  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }

955
  STMT_ERR_RET(stmtCreateRequest(pStmt));
D
dapan1121 已提交
956 957 958 959 960 961 962 963 964 965 966 967 968

  if (pStmt->bInfo.needParse) {
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

  STMT_ERR_RET(stmtFetchTagFields(stmt, nums, fields));

  return TSDB_CODE_SUCCESS;
}

int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
  STscStmt* pStmt = (STscStmt*)stmt;

969 970
  STMT_DLOG_E("start to get col fields");

D
dapan1121 已提交
971 972 973
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }
974

D
dapan1121 已提交
975 976 977 978 979 980 981 982 983 984 985 986
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));

  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
    pStmt->bInfo.needParse = false;
  }

  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }

987
  STMT_ERR_RET(stmtCreateRequest(pStmt));
D
dapan1121 已提交
988 989 990 991 992 993 994 995 996 997

  if (pStmt->bInfo.needParse) {
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

  STMT_ERR_RET(stmtFetchColFields(stmt, nums, fields));

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
998
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
D
dapan1121 已提交
999 1000
  STscStmt* pStmt = (STscStmt*)stmt;

1001 1002
  STMT_DLOG_E("start to get param num");

D
dapan1121 已提交
1003 1004
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));

X
Xiaoyu Wang 已提交
1005 1006
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
D
dapan1121 已提交
1007 1008 1009 1010 1011 1012 1013
    pStmt->bInfo.needParse = false;
  }

  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
X
Xiaoyu Wang 已提交
1014

1015
  STMT_ERR_RET(stmtCreateRequest(pStmt));
D
dapan1121 已提交
1016 1017 1018 1019 1020
  if (pStmt->bInfo.needParse) {
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

  if (STMT_TYPE_QUERY == pStmt->sql.type) {
D
dapan1121 已提交
1021
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
D
dapan1121 已提交
1022 1023 1024
  } else {
    STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
  }
X
Xiaoyu Wang 已提交
1025

D
stmt  
dapan1121 已提交
1026 1027 1028
  return TSDB_CODE_SUCCESS;
}

1029
int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
D
dapan1121 已提交
1030 1031
  STscStmt* pStmt = (STscStmt*)stmt;

1032 1033
  STMT_DLOG_E("start to get param");

D
dapan1121 已提交
1034 1035 1036
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }
1037

D
dapan1121 已提交
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));

  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
    pStmt->bInfo.needParse = false;
  }

  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }

1050
  STMT_ERR_RET(stmtCreateRequest(pStmt));
D
dapan1121 已提交
1051 1052 1053 1054 1055

  if (pStmt->bInfo.needParse) {
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

1056 1057
  int32_t       nums = 0;
  TAOS_FIELD_E* pField = NULL;
D
dapan1121 已提交
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
  STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField));
  if (idx >= nums) {
    tscError("idx %d is too big", idx);
    taosMemoryFree(pField);
    STMT_ERR_RET(TSDB_CODE_INVALID_PARA);
  }

  *type = pField[idx].type;
  *bytes = pField[idx].bytes;

  taosMemoryFree(pField);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1073
TAOS_RES* stmtUseResult(TAOS_STMT* stmt) {
D
dapan1121 已提交
1074 1075
  STscStmt* pStmt = (STscStmt*)stmt;

1076 1077
  STMT_DLOG_E("start to use result");

D
dapan1121 已提交
1078 1079 1080 1081 1082 1083
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
    tscError("useResult only for query statement");
    return NULL;
  }

  return pStmt->exec.pRequest;
D
stmt  
dapan1121 已提交
1084
}