clientStmt.c 18.0 KB
Newer Older
D
dapan1121 已提交
1 2 3

#include "clientInt.h"
#include "clientLog.h"
D
stmt  
dapan1121 已提交
4
#include "clientStmt.h"
D
dapan1121 已提交
5 6
#include "tdef.h"

D
stmt  
dapan1121 已提交
7
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
D
dapan1121 已提交
8 9
  int32_t code = 0;
  
D
stmt  
dapan1121 已提交
10
  switch (newStatus) {
D
dapan1121 已提交
11 12
    case STMT_PREPARE:
      break;
D
stmt  
dapan1121 已提交
13
    case STMT_SETTBNAME:
D
dapan1121 已提交
14
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL)) {
D
dapan1121 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_SETTAGS:
      if (STMT_STATUS_NE(SETTBNAME)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_FETCH_FIELDS:
      if (STMT_STATUS_EQ(INIT)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_BIND:
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_BIND_COL:
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_ADD_BATCH:
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
D
stmt  
dapan1121 已提交
42
      break;
D
dapan1121 已提交
43 44 45 46
    case STMT_EXECUTE:
      if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
D
dapan1121 已提交
47
      break;
D
stmt  
dapan1121 已提交
48
    default:
D
dapan1121 已提交
49
      code = TSDB_CODE_TSC_APP_ERROR;
D
stmt  
dapan1121 已提交
50 51 52
      break;
  }

D
dapan1121 已提交
53
  STMT_ERR_RET(code);
D
stmt  
dapan1121 已提交
54 55 56 57 58 59 60

  pStmt->sql.status = newStatus;

  return TSDB_CODE_SUCCESS;
}


D
stmt  
dapan1121 已提交
61 62 63
int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
64
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
D
stmt  
dapan1121 已提交
65
  
D
stmt  
dapan1121 已提交
66
  if (NULL == pStmt->bInfo.tbName) {
D
stmt  
dapan1121 已提交
67 68 69 70
    tscError("no table name set");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
  }

D
stmt  
dapan1121 已提交
71
  *tbName = pStmt->bInfo.tbName;
D
stmt  
dapan1121 已提交
72 73 74 75

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
int32_t stmtBackupQueryFields(STscStmt* pStmt) {
  SQueryFields *pFields = &pStmt->sql.fields;
  int32_t size = pFields->numOfCols * sizeof(TAOS_FIELD);
  
  pFields->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
  pFields->fields = taosMemoryMalloc(size);
  pFields->userFields = taosMemoryMalloc(size);
  if (NULL == pFields->fields || NULL == pFields->userFields) {
    STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  memcpy(pFields->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
  memcpy(pFields->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
92 93 94
int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
95 96 97 98
  pStmt->bInfo.tbUid = pTableMeta->uid;
  pStmt->bInfo.tbSuid = pTableMeta->suid;
  pStmt->bInfo.tbType = pTableMeta->tableType;
  pStmt->bInfo.boundTags = tags;
D
stmt  
dapan1121 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116

  return TSDB_CODE_SUCCESS;
}

int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
  STscStmt* pStmt = (STscStmt*)stmt;

  pStmt->exec.pVgHash = pVgHash;
  pStmt->exec.pBlockHash = pBlockHash;

  return TSDB_CODE_SUCCESS;
}

int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
  STscStmt* pStmt = (STscStmt*)stmt;

  *pVgHash = pStmt->exec.pVgHash;
  *pBlockHash = pStmt->exec.pBlockHash;
D
stmt  
dapan1121 已提交
117 118 119 120

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
121 122 123 124
int32_t stmtCacheBlock(STscStmt *pStmt) {
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
    return TSDB_CODE_SUCCESS;
  }
D
stmt  
dapan1121 已提交
125

D
dapan1121 已提交
126 127
  uint64_t uid = pStmt->bInfo.tbUid; 
  uint64_t tuid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
D
stmt  
dapan1121 已提交
128

D
dapan1121 已提交
129
  if (taosHashGet(pStmt->sql.pTableCache, &tuid, sizeof(tuid))) {
D
stmt  
dapan1121 已提交
130 131 132
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
133
  STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid));
D
stmt  
dapan1121 已提交
134 135
  STableDataBlocks* pDst = NULL;
  
D
stmt  
dapan1121 已提交
136
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
D
stmt  
dapan1121 已提交
137 138 139

  SStmtTableCache cache = {
    .pDataBlock = pDst,
D
stmt  
dapan1121 已提交
140
    .boundTags = pStmt->bInfo.boundTags,
D
stmt  
dapan1121 已提交
141 142
  };

D
dapan1121 已提交
143
  if (taosHashPut(pStmt->sql.pTableCache, &tuid, sizeof(tuid), &cache, sizeof(cache))) {
D
stmt  
dapan1121 已提交
144 145
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
stmt  
dapan1121 已提交
146

D
stmt  
dapan1121 已提交
147
  pStmt->bInfo.boundTags = NULL;
D
stmt  
dapan1121 已提交
148 149 150 151

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
152 153 154 155 156 157 158 159
int32_t stmtParseSql(STscStmt* pStmt) {
  SStmtCallback stmtCb = {
    .pStmt = pStmt, 
    .getTbNameFn = stmtGetTbName, 
    .setBindInfoFn = stmtSetBindInfo,
    .setExecInfoFn = stmtSetExecInfo,
    .getExecInfoFn = stmtGetExecInfo,
  };
D
stmt  
dapan1121 已提交
160 161 162 163

  if (NULL == pStmt->exec.pRequest) {
    STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
  }
D
stmt  
dapan1121 已提交
164 165 166
  
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));

D
stmt  
dapan1121 已提交
167
  pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
  
  switch (nodeType(pStmt->sql.pQuery->pRoot)) {
    case QUERY_NODE_VNODE_MODIF_STMT:
      if (0 == pStmt->sql.type) {
        pStmt->sql.type = STMT_TYPE_INSERT;
      }
      break;
    case QUERY_NODE_SELECT_STMT:
      pStmt->sql.type = STMT_TYPE_QUERY;
      break;
    default:
      tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
  }

  STMT_ERR_RET(stmtCacheBlock(pStmt));

  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
187

D
stmt  
dapan1121 已提交
188
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
189 190 191 192
  pStmt->bInfo.tbUid = 0;
  pStmt->bInfo.tbSuid = 0;
  pStmt->bInfo.tbType = 0;
  pStmt->bInfo.needParse = true;
D
stmt  
dapan1121 已提交
193

D
stmt  
dapan1121 已提交
194 195 196
  taosMemoryFreeClear(pStmt->bInfo.tbName);
  destroyBoundColumnInfo(pStmt->bInfo.boundTags);
  taosMemoryFreeClear(pStmt->bInfo.boundTags);
D
stmt  
dapan1121 已提交
197 198

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
199 200
}

D
dapan1121 已提交
201 202 203 204 205
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
  if (STMT_TYPE_QUERY != pStmt->sql.type || freeRequest) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
D
stmt  
dapan1121 已提交
206 207 208 209

  void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
  while (pIter) {
    STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;    
D
stmt  
dapan1121 已提交
210 211
    uint64_t *key = taosHashGetKey(pIter, NULL);
    
D
stmt  
dapan1121 已提交
212
    if (keepTable && (*key == pStmt->bInfo.tbUid)) {
D
stmt  
dapan1121 已提交
213
      STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
D
stmt  
dapan1121 已提交
214 215 216 217 218
      
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
      continue;
    }

D
stmt  
dapan1121 已提交
219 220
    qFreeStmtDataBlock(pBlocks);
    taosHashRemove(pStmt->exec.pBlockHash, key, sizeof(*key));
D
stmt  
dapan1121 已提交
221 222 223 224 225 226 227 228 229 230

    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
  }

  if (keepTable) {
    return TSDB_CODE_SUCCESS;
  }

  taosHashCleanup(pStmt->exec.pBlockHash);
  pStmt->exec.pBlockHash = NULL;
D
stmt  
dapan1121 已提交
231 232 233 234 235 236 237 238 239

  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

  return TSDB_CODE_SUCCESS;
}

int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
  taosMemoryFree(pStmt->sql.sqlStr);
  qDestroyQuery(pStmt->sql.pQuery);
D
dapan1121 已提交
240 241 242
  qDestroyQueryPlan(pStmt->sql.pQueryPlan);
  taosArrayDestroy(pStmt->sql.nodeList);
  
D
stmt  
dapan1121 已提交
243 244
  void *pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
  while (pIter) {
D
stmt  
dapan1121 已提交
245
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;    
D
stmt  
dapan1121 已提交
246

D
stmt  
dapan1121 已提交
247
    qDestroyStmtDataBlock(pCache->pDataBlock);
D
stmt  
dapan1121 已提交
248
    destroyBoundColumnInfo(pCache->boundTags);
D
stmt  
dapan1121 已提交
249
    
D
stmt  
dapan1121 已提交
250 251 252 253 254 255 256
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
  }
  taosHashCleanup(pStmt->sql.pTableCache);
  pStmt->sql.pTableCache = NULL;

  memset(&pStmt->sql, 0, sizeof(pStmt->sql));

D
dapan1121 已提交
257
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
D
stmt  
dapan1121 已提交
258
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
D
stmt  
dapan1121 已提交
259 260 261 262 263

  return TSDB_CODE_SUCCESS;
}

int32_t stmtGetFromCache(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
264 265
  pStmt->bInfo.needParse = true;

D
stmt  
dapan1121 已提交
266
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
D
stmt  
dapan1121 已提交
267 268 269 270 271 272 273 274 275
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == pStmt->pCatalog) {
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
  }

  STableMeta *pTableMeta = NULL;
  SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
D
stmt  
dapan1121 已提交
276
  STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta));
D
dapan1121 已提交
277 278 279 280 281 282
  uint64_t uid = pTableMeta->uid;
  uint64_t suid = pTableMeta->suid;
  int8_t tableType = pTableMeta->tableType;
  taosMemoryFree(pTableMeta);
  
  if (uid == pStmt->bInfo.tbUid) {
D
stmt  
dapan1121 已提交
283
    pStmt->bInfo.needParse = false;
D
dapan1121 已提交
284

D
stmt  
dapan1121 已提交
285 286 287
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
288 289
  if (taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid))) {
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
D
stmt  
dapan1121 已提交
290
    if (NULL == pCache) {
D
dapan1121 已提交
291 292
      tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", uid);
      
D
stmt  
dapan1121 已提交
293 294 295
      STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
    }
    
D
stmt  
dapan1121 已提交
296
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
297
    
D
dapan1121 已提交
298 299 300
    pStmt->bInfo.tbUid = uid;
    pStmt->bInfo.tbSuid = suid;
    pStmt->bInfo.tbType = tableType;
D
stmt  
dapan1121 已提交
301
    pStmt->bInfo.boundTags = pCache->boundTags;
D
dapan1121 已提交
302

D
stmt  
dapan1121 已提交
303 304 305
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
306
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
D
stmt  
dapan1121 已提交
307
  if (pCache) {
D
stmt  
dapan1121 已提交
308
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
309

D
dapan1121 已提交
310 311 312
    pStmt->bInfo.tbUid = uid;
    pStmt->bInfo.tbSuid = suid;
    pStmt->bInfo.tbType = tableType;
D
stmt  
dapan1121 已提交
313
    pStmt->bInfo.boundTags = pCache->boundTags;
D
stmt  
dapan1121 已提交
314

D
stmt  
dapan1121 已提交
315
    STableDataBlocks* pNewBlock = NULL;
D
stmt  
dapan1121 已提交
316
    STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock));
D
stmt  
dapan1121 已提交
317

D
stmt  
dapan1121 已提交
318
    if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid), &pNewBlock, POINTER_BYTES)) {
D
stmt  
dapan1121 已提交
319 320
      STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
stmt  
dapan1121 已提交
321 322 323 324
    
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
325 326
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

D
stmt  
dapan1121 已提交
327 328 329
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343
int32_t stmtResetStmt(STscStmt* pStmt) {
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));

  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    STMT_ERR_RET(terrno);
  }

  pStmt->sql.status = STMT_INIT;

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
344

D
dapan1121 已提交
345 346 347 348 349
TAOS_STMT *stmtInit(TAOS *taos) {
  STscObj* pObj = (STscObj*)taos;
  STscStmt* pStmt = NULL;

  pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
D
stmt  
dapan1121 已提交
350
  if (NULL == pStmt) {
D
dapan1121 已提交
351 352 353
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
D
stmt  
dapan1121 已提交
354

D
stmt  
dapan1121 已提交
355 356
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
D
stmt  
dapan1121 已提交
357 358 359 360
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    taosMemoryFree(pStmt);
    return NULL;
  }
D
stmt  
dapan1121 已提交
361

D
dapan1121 已提交
362
  pStmt->taos = pObj;
D
stmt  
dapan1121 已提交
363
  pStmt->bInfo.needParse = true;
D
stmt  
dapan1121 已提交
364
  pStmt->sql.status = STMT_INIT;
D
stmt  
dapan1121 已提交
365
  
D
stmt  
dapan1121 已提交
366 367
  return pStmt;
}
D
dapan1121 已提交
368

D
stmt  
dapan1121 已提交
369 370 371
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
372
  if (pStmt->sql.status >= STMT_PREPARE) {
D
stmt  
dapan1121 已提交
373
    STMT_ERR_RET(stmtResetStmt(pStmt));
D
stmt  
dapan1121 已提交
374 375
  }

D
stmt  
dapan1121 已提交
376 377 378 379 380
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));

  if (length <= 0) {
    length = strlen(sql);
  }
D
stmt  
dapan1121 已提交
381
  
D
stmt  
dapan1121 已提交
382 383
  pStmt->sql.sqlStr = strndup(sql, length);
  pStmt->sql.sqlLen = length;
D
stmt  
dapan1121 已提交
384 385 386 387 388

  return TSDB_CODE_SUCCESS;
}


D
stmt  
dapan1121 已提交
389
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) {
D
stmt  
dapan1121 已提交
390 391
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
392
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
D
stmt  
dapan1121 已提交
393

D
dapan1121 已提交
394 395 396 397 398 399 400
  int32_t insert = 0;
  stmtIsInsert(stmt, &insert);
  if (0 == insert) {
    tscError("set tb name not available for none insert statement");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
stmt  
dapan1121 已提交
401 402
  if (NULL == pStmt->exec.pRequest) {
    STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
D
dapan1121 已提交
403
  }
D
stmt  
dapan1121 已提交
404
  
D
stmt  
dapan1121 已提交
405
  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
D
stmt  
dapan1121 已提交
406
    
D
stmt  
dapan1121 已提交
407
  STMT_ERR_RET(stmtGetFromCache(pStmt));
D
stmt  
dapan1121 已提交
408

D
stmt  
dapan1121 已提交
409 410 411 412 413
  if (pStmt->bInfo.needParse) {
    taosMemoryFree(pStmt->bInfo.tbName);
    pStmt->bInfo.tbName = strdup(tbName);
  }

D
stmt  
dapan1121 已提交
414 415 416
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
417
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
D
stmt  
dapan1121 已提交
418
  STscStmt* pStmt = (STscStmt*)stmt;
D
dapan1121 已提交
419

D
stmt  
dapan1121 已提交
420
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
D
stmt  
dapan1121 已提交
421

D
stmt  
dapan1121 已提交
422
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
423 424 425
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

D
stmt  
dapan1121 已提交
426
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
427
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
428
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
429 430 431
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }
  
D
stmt  
dapan1121 已提交
432
  STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, &pStmt->bInfo.sname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
D
stmt  
dapan1121 已提交
433

D
stmt  
dapan1121 已提交
434 435 436 437
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
438
int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t *fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
439 440 441 442 443
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query tag fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
stmt  
dapan1121 已提交
444
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
445
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
446
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
447 448 449
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
stmt  
dapan1121 已提交
450
  STMT_ERR_RET(qBuildStmtTagFields(*pDataBlock, pStmt->bInfo.boundTags, fieldNum, fields));
D
stmt  
dapan1121 已提交
451

D
stmt  
dapan1121 已提交
452 453
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
454

D
dapan1121 已提交
455
int32_t stmtFetchColFields(STscStmt* pStmt, int32_t *fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
456 457 458 459 460
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query column fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
stmt  
dapan1121 已提交
461
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
462
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
463
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
464 465 466
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
stmt  
dapan1121 已提交
467
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
D
stmt  
dapan1121 已提交
468 469 470 471

  return TSDB_CODE_SUCCESS;  
}

D
dapan1121 已提交
472
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
D
stmt  
dapan1121 已提交
473 474
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
475
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
D
stmt  
dapan1121 已提交
476

D
stmt  
dapan1121 已提交
477 478
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
479
  }
D
stmt  
dapan1121 已提交
480

D
dapan1121 已提交
481 482 483 484 485
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
  
D
stmt  
dapan1121 已提交
486 487
  if (NULL == pStmt->exec.pRequest) {
    STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
D
stmt  
dapan1121 已提交
488 489
  }

D
stmt  
dapan1121 已提交
490
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
491 492
    STMT_ERR_RET(stmtParseSql(pStmt));
  }
D
stmt  
dapan1121 已提交
493

D
dapan1121 已提交
494 495 496 497 498
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    if (NULL == pStmt->sql.pQueryPlan) {
      STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
      pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
      pStmt->exec.pRequest->body.pDag = NULL;
D
dapan1121 已提交
499
      STMT_ERR_RET(stmtBackupQueryFields(pStmt));
D
dapan1121 已提交
500 501
    }
    
D
dapan1121 已提交
502
    STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId));
D
dapan1121 已提交
503 504
  }
  
D
stmt  
dapan1121 已提交
505
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
506
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
507
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
508 509
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }
D
stmt  
dapan1121 已提交
510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526

  if (colIdx < 0) {
    qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
  } else {
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
      tscError("bind column index not in sequence");
      STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
    }

    pStmt->bInfo.sBindLastIdx = colIdx;
    
    if (0 == colIdx) {
      pStmt->bInfo.sBindRowNum = bind->num;
    }
    
    qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum);
  }
D
stmt  
dapan1121 已提交
527 528
  
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
529 530
}

D
stmt  
dapan1121 已提交
531 532 533 534

int stmtAddBatch(TAOS_STMT *stmt) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
535
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
D
stmt  
dapan1121 已提交
536

D
stmt  
dapan1121 已提交
537
  STMT_ERR_RET(stmtCacheBlock(pStmt));
D
stmt  
dapan1121 已提交
538
  
D
stmt  
dapan1121 已提交
539 540 541 542
  return TSDB_CODE_SUCCESS;
}

int stmtExec(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
543 544 545
  STscStmt* pStmt = (STscStmt*)stmt;
  int32_t code = 0;

D
stmt  
dapan1121 已提交
546
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
D
stmt  
dapan1121 已提交
547

D
dapan1121 已提交
548 549 550 551 552 553 554
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList);
  } else {
    STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true);
  }
  
D
stmt  
dapan1121 已提交
555
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
D
stmt  
dapan1121 已提交
556

D
dapan1121 已提交
557 558
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
  pStmt->affectedRows += pStmt->exec.affectedRows;
D
stmt  
dapan1121 已提交
559

D
stmt  
dapan1121 已提交
560 561
_return:

D
dapan1121 已提交
562
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
D
stmt  
dapan1121 已提交
563 564
  
  ++pStmt->sql.runTimes;
D
stmt  
dapan1121 已提交
565 566
  
  STMT_RET(code);
D
stmt  
dapan1121 已提交
567 568 569
}


D
stmt  
dapan1121 已提交
570
int stmtClose(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
571 572 573
  STscStmt* pStmt = (STscStmt*)stmt;

  STMT_RET(stmtCleanSQLInfo(pStmt));
D
dapan1121 已提交
574 575

  taosMemoryFree(stmt);
D
stmt  
dapan1121 已提交
576 577
}

D
stmt  
dapan1121 已提交
578
const char *stmtErrstr(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
579
  STscStmt* pStmt = (STscStmt*)stmt;
D
stmt  
dapan1121 已提交
580

D
stmt  
dapan1121 已提交
581 582 583 584
  if (stmt == NULL) {
    return (char*) tstrerror(terrno);
  }

D
stmt  
dapan1121 已提交
585 586 587 588
  if (pStmt->exec.pRequest) {
    pStmt->exec.pRequest->code = terrno;
  }

D
stmt  
dapan1121 已提交
589
  return taos_errstr(pStmt->exec.pRequest);
D
stmt  
dapan1121 已提交
590 591
}

D
stmt  
dapan1121 已提交
592
int stmtAffectedRows(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
593
  return ((STscStmt*)stmt)->affectedRows;
D
stmt  
dapan1121 已提交
594 595
}

D
dapan1121 已提交
596 597 598 599
int stmtAffectedRowsOnce(TAOS_STMT *stmt) {
  return ((STscStmt*)stmt)->exec.affectedRows;
}

D
stmt  
dapan1121 已提交
600
int stmtIsInsert(TAOS_STMT *stmt, int *insert) {
D
stmt  
dapan1121 已提交
601 602
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
603 604 605 606 607
  if (pStmt->sql.type) {
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
  } else {
    *insert = isInsertSql(pStmt->sql.sqlStr, 0);
  }
D
stmt  
dapan1121 已提交
608
  
D
stmt  
dapan1121 已提交
609 610 611 612
  return TSDB_CODE_SUCCESS;
}

int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
D
dapan1121 已提交
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
  STscStmt* pStmt = (STscStmt*)stmt;

  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));

  if (pStmt->bInfo.needParse) {
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    if (NULL == pStmt->sql.pQueryPlan) {
      STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
      pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
      pStmt->exec.pRequest->body.pDag = NULL;
    }

D
dapan1121 已提交
628
    *nums = taosArrayGetSize(pStmt->sql.pQueryPlan->pPlaceholderValues);
D
dapan1121 已提交
629 630 631
  } else {
    STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
  }
D
stmt  
dapan1121 已提交
632
  
D
stmt  
dapan1121 已提交
633 634 635 636
  return TSDB_CODE_SUCCESS;
}

TAOS_RES *stmtUseResult(TAOS_STMT *stmt) {
D
dapan1121 已提交
637 638 639 640 641 642 643 644
  STscStmt* pStmt = (STscStmt*)stmt;

  if (STMT_TYPE_QUERY != pStmt->sql.type) {
    tscError("useResult only for query statement");
    return NULL;
  }

  return pStmt->exec.pRequest;
D
stmt  
dapan1121 已提交
645 646 647 648
}