clientStmt.c 14.8 KB
Newer Older
D
dapan1121 已提交
1 2 3

#include "clientInt.h"
#include "clientLog.h"
D
stmt  
dapan1121 已提交
4
#include "clientStmt.h"
D
dapan1121 已提交
5 6
#include "tdef.h"

D
stmt  
dapan1121 已提交
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
  switch (newStatus) {
    case STMT_SETTBNAME:
      break;
    default:
      break;
  }

  //STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);

  pStmt->sql.status = newStatus;

  return TSDB_CODE_SUCCESS;
}


D
stmt  
dapan1121 已提交
23 24 25
int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
26
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
D
stmt  
dapan1121 已提交
27
  
D
stmt  
dapan1121 已提交
28
  if (NULL == pStmt->bInfo.tbName) {
D
stmt  
dapan1121 已提交
29 30 31 32
    tscError("no table name set");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
  }

D
stmt  
dapan1121 已提交
33
  *tbName = pStmt->bInfo.tbName;
D
stmt  
dapan1121 已提交
34 35 36 37 38 39 40

  return TSDB_CODE_SUCCESS;
}

int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
41 42 43 44
  pStmt->bInfo.tbUid = pTableMeta->uid;
  pStmt->bInfo.tbSuid = pTableMeta->suid;
  pStmt->bInfo.tbType = pTableMeta->tableType;
  pStmt->bInfo.boundTags = tags;
D
stmt  
dapan1121 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62

  return TSDB_CODE_SUCCESS;
}

int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
  STscStmt* pStmt = (STscStmt*)stmt;

  pStmt->exec.pVgHash = pVgHash;
  pStmt->exec.pBlockHash = pBlockHash;

  return TSDB_CODE_SUCCESS;
}

int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
  STscStmt* pStmt = (STscStmt*)stmt;

  *pVgHash = pStmt->exec.pVgHash;
  *pBlockHash = pStmt->exec.pBlockHash;
D
stmt  
dapan1121 已提交
63 64 65 66

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
67 68 69 70
int32_t stmtCacheBlock(STscStmt *pStmt) {
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
    return TSDB_CODE_SUCCESS;
  }
D
stmt  
dapan1121 已提交
71 72

  uint64_t uid;
D
stmt  
dapan1121 已提交
73 74
  if (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) {
    uid = pStmt->bInfo.tbSuid;
D
stmt  
dapan1121 已提交
75
  } else {
D
stmt  
dapan1121 已提交
76 77
    ASSERT(TSDB_NORMAL_TABLE == pStmt->bInfo.tbType);
    uid = pStmt->bInfo.tbUid;
D
stmt  
dapan1121 已提交
78 79
  }

D
stmt  
dapan1121 已提交
80
  if (taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid))) {
D
stmt  
dapan1121 已提交
81 82 83
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
84
  STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid));
D
stmt  
dapan1121 已提交
85 86
  STableDataBlocks* pDst = NULL;
  
D
stmt  
dapan1121 已提交
87
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
D
stmt  
dapan1121 已提交
88 89 90

  SStmtTableCache cache = {
    .pDataBlock = pDst,
D
stmt  
dapan1121 已提交
91
    .boundTags = pStmt->bInfo.boundTags,
D
stmt  
dapan1121 已提交
92 93 94 95 96
  };

  if (taosHashPut(pStmt->sql.pTableCache, &uid, sizeof(uid), &cache, sizeof(cache))) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
stmt  
dapan1121 已提交
97

D
stmt  
dapan1121 已提交
98
  pStmt->bInfo.boundTags = NULL;
D
stmt  
dapan1121 已提交
99 100 101 102

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
103 104 105 106 107 108 109 110
int32_t stmtParseSql(STscStmt* pStmt) {
  SStmtCallback stmtCb = {
    .pStmt = pStmt, 
    .getTbNameFn = stmtGetTbName, 
    .setBindInfoFn = stmtSetBindInfo,
    .setExecInfoFn = stmtSetExecInfo,
    .getExecInfoFn = stmtGetExecInfo,
  };
D
stmt  
dapan1121 已提交
111 112 113 114

  if (NULL == pStmt->exec.pRequest) {
    STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
  }
D
stmt  
dapan1121 已提交
115 116 117
  
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));

D
stmt  
dapan1121 已提交
118
  pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
  
  switch (nodeType(pStmt->sql.pQuery->pRoot)) {
    case QUERY_NODE_VNODE_MODIF_STMT:
      if (0 == pStmt->sql.type) {
        pStmt->sql.type = STMT_TYPE_INSERT;
      }
      break;
    case QUERY_NODE_SELECT_STMT:
      pStmt->sql.type = STMT_TYPE_QUERY;
      break;
    default:
      tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
  }

  STMT_ERR_RET(stmtCacheBlock(pStmt));

  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
138

D
stmt  
dapan1121 已提交
139
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
140 141 142 143
  pStmt->bInfo.tbUid = 0;
  pStmt->bInfo.tbSuid = 0;
  pStmt->bInfo.tbType = 0;
  pStmt->bInfo.needParse = true;
D
stmt  
dapan1121 已提交
144

D
stmt  
dapan1121 已提交
145 146 147
  taosMemoryFreeClear(pStmt->bInfo.tbName);
  destroyBoundColumnInfo(pStmt->bInfo.boundTags);
  taosMemoryFreeClear(pStmt->bInfo.boundTags);
D
stmt  
dapan1121 已提交
148 149

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
150 151 152 153 154
}

int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable) {
  taos_free_result(pStmt->exec.pRequest);
  pStmt->exec.pRequest = NULL;
D
stmt  
dapan1121 已提交
155 156 157 158

  void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
  while (pIter) {
    STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;    
D
stmt  
dapan1121 已提交
159 160
    uint64_t *key = taosHashGetKey(pIter, NULL);
    
D
stmt  
dapan1121 已提交
161
    if (keepTable && (*key == pStmt->bInfo.tbUid)) {
D
stmt  
dapan1121 已提交
162
      STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
D
stmt  
dapan1121 已提交
163 164 165 166 167
      
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
      continue;
    }

D
stmt  
dapan1121 已提交
168 169
    qFreeStmtDataBlock(pBlocks);
    taosHashRemove(pStmt->exec.pBlockHash, key, sizeof(*key));
D
stmt  
dapan1121 已提交
170 171 172 173 174 175 176 177 178 179

    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
  }

  if (keepTable) {
    return TSDB_CODE_SUCCESS;
  }

  taosHashCleanup(pStmt->exec.pBlockHash);
  pStmt->exec.pBlockHash = NULL;
D
stmt  
dapan1121 已提交
180 181 182 183 184 185 186 187 188 189 190 191

  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

  return TSDB_CODE_SUCCESS;
}

int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
  taosMemoryFree(pStmt->sql.sqlStr);
  qDestroyQuery(pStmt->sql.pQuery);

  void *pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
  while (pIter) {
D
stmt  
dapan1121 已提交
192
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;    
D
stmt  
dapan1121 已提交
193

D
stmt  
dapan1121 已提交
194
    qDestroyStmtDataBlock(pCache->pDataBlock);
D
stmt  
dapan1121 已提交
195
    destroyBoundColumnInfo(pCache->boundTags);
D
stmt  
dapan1121 已提交
196
    
D
stmt  
dapan1121 已提交
197 198 199 200 201 202 203 204 205
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
  }
  taosHashCleanup(pStmt->sql.pTableCache);
  pStmt->sql.pTableCache = NULL;

  memset(&pStmt->sql, 0, sizeof(pStmt->sql));

  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false));
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
D
stmt  
dapan1121 已提交
206 207 208 209 210

  return TSDB_CODE_SUCCESS;
}

int32_t stmtGetFromCache(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
211 212
  pStmt->bInfo.needParse = true;

D
stmt  
dapan1121 已提交
213
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
D
stmt  
dapan1121 已提交
214 215 216 217 218 219 220 221 222
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == pStmt->pCatalog) {
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
  }

  STableMeta *pTableMeta = NULL;
  SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
D
stmt  
dapan1121 已提交
223
  STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta));
D
stmt  
dapan1121 已提交
224

D
stmt  
dapan1121 已提交
225 226
  if (pTableMeta->uid == pStmt->bInfo.tbUid) {
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
227
    
D
stmt  
dapan1121 已提交
228 229 230
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
231 232 233 234 235 236 237
  if (taosHashGet(pStmt->exec.pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid))) {
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
    if (NULL == pCache) {
      tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", pTableMeta->uid);
      STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
    }
    
D
stmt  
dapan1121 已提交
238
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
239
    
D
stmt  
dapan1121 已提交
240 241 242 243
    pStmt->bInfo.tbUid = pTableMeta->uid;
    pStmt->bInfo.tbSuid = pTableMeta->suid;
    pStmt->bInfo.tbType = pTableMeta->tableType;
    pStmt->bInfo.boundTags = pCache->boundTags;
D
stmt  
dapan1121 已提交
244 245 246 247
    
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
248 249
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
  if (pCache) {
D
stmt  
dapan1121 已提交
250
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
251

D
stmt  
dapan1121 已提交
252 253 254 255
    pStmt->bInfo.tbUid = pTableMeta->uid;
    pStmt->bInfo.tbSuid = pTableMeta->suid;
    pStmt->bInfo.tbType = pTableMeta->tableType;
    pStmt->bInfo.boundTags = pCache->boundTags;
D
stmt  
dapan1121 已提交
256

D
stmt  
dapan1121 已提交
257
    STableDataBlocks* pNewBlock = NULL;
D
stmt  
dapan1121 已提交
258
    STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock));
D
stmt  
dapan1121 已提交
259

D
stmt  
dapan1121 已提交
260
    if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid), &pNewBlock, POINTER_BYTES)) {
D
stmt  
dapan1121 已提交
261 262
      STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
stmt  
dapan1121 已提交
263 264 265 266
    
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
267 268
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

D
stmt  
dapan1121 已提交
269 270 271
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285
int32_t stmtResetStmt(STscStmt* pStmt) {
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));

  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    STMT_ERR_RET(terrno);
  }

  pStmt->sql.status = STMT_INIT;

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
286

D
dapan1121 已提交
287 288 289 290 291
TAOS_STMT *stmtInit(TAOS *taos) {
  STscObj* pObj = (STscObj*)taos;
  STscStmt* pStmt = NULL;

  pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
D
stmt  
dapan1121 已提交
292
  if (NULL == pStmt) {
D
dapan1121 已提交
293 294 295
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
D
stmt  
dapan1121 已提交
296

D
stmt  
dapan1121 已提交
297 298
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
D
stmt  
dapan1121 已提交
299 300 301 302
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    taosMemoryFree(pStmt);
    return NULL;
  }
D
stmt  
dapan1121 已提交
303

D
dapan1121 已提交
304
  pStmt->taos = pObj;
D
stmt  
dapan1121 已提交
305
  pStmt->bInfo.needParse = true;
D
stmt  
dapan1121 已提交
306
  pStmt->sql.status = STMT_INIT;
D
stmt  
dapan1121 已提交
307
  
D
stmt  
dapan1121 已提交
308 309
  return pStmt;
}
D
dapan1121 已提交
310

D
stmt  
dapan1121 已提交
311 312 313
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
314
  if (pStmt->sql.status >= STMT_PREPARE) {
D
stmt  
dapan1121 已提交
315
    STMT_ERR_RET(stmtResetStmt(pStmt));
D
stmt  
dapan1121 已提交
316 317
  }

D
stmt  
dapan1121 已提交
318 319 320 321 322
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));

  if (length <= 0) {
    length = strlen(sql);
  }
D
stmt  
dapan1121 已提交
323
  
D
stmt  
dapan1121 已提交
324 325
  pStmt->sql.sqlStr = strndup(sql, length);
  pStmt->sql.sqlLen = length;
D
stmt  
dapan1121 已提交
326 327 328 329 330

  return TSDB_CODE_SUCCESS;
}


D
stmt  
dapan1121 已提交
331
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) {
D
stmt  
dapan1121 已提交
332 333
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
334
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
D
stmt  
dapan1121 已提交
335

D
stmt  
dapan1121 已提交
336 337
  if (NULL == pStmt->exec.pRequest) {
    STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
D
dapan1121 已提交
338
  }
D
stmt  
dapan1121 已提交
339
  
D
stmt  
dapan1121 已提交
340
  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
D
stmt  
dapan1121 已提交
341
    
D
stmt  
dapan1121 已提交
342
  STMT_ERR_RET(stmtGetFromCache(pStmt));
D
stmt  
dapan1121 已提交
343

D
stmt  
dapan1121 已提交
344 345 346 347 348
  if (pStmt->bInfo.needParse) {
    taosMemoryFree(pStmt->bInfo.tbName);
    pStmt->bInfo.tbName = strdup(tbName);
  }

D
stmt  
dapan1121 已提交
349 350 351
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
352
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND_v2 *tags) {
D
stmt  
dapan1121 已提交
353
  STscStmt* pStmt = (STscStmt*)stmt;
D
dapan1121 已提交
354

D
stmt  
dapan1121 已提交
355
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
D
stmt  
dapan1121 已提交
356

D
stmt  
dapan1121 已提交
357
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
358 359 360
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

D
stmt  
dapan1121 已提交
361
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
362
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
363
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
364 365 366
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }
  
D
stmt  
dapan1121 已提交
367
  STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, &pStmt->bInfo.sname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
D
stmt  
dapan1121 已提交
368

D
stmt  
dapan1121 已提交
369 370 371 372 373 374 375
  return TSDB_CODE_SUCCESS;
}


int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
376
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_TAG_FIELDS));
D
stmt  
dapan1121 已提交
377

D
stmt  
dapan1121 已提交
378
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
379
    STMT_ERR_RET(stmtParseSql(pStmt));
D
dapan1121 已提交
380 381
  }

D
stmt  
dapan1121 已提交
382 383 384 385 386
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query tag fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
stmt  
dapan1121 已提交
387
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
388
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
389
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
390 391 392
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
stmt  
dapan1121 已提交
393
  STMT_ERR_RET(qBuildStmtTagFields(*pDataBlock, pStmt->bInfo.boundTags, fieldNum, fields));
D
stmt  
dapan1121 已提交
394

D
stmt  
dapan1121 已提交
395 396
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
397

D
stmt  
dapan1121 已提交
398
int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
399 400
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
401
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_COL_FIELDS));
D
stmt  
dapan1121 已提交
402

D
stmt  
dapan1121 已提交
403
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
404 405 406
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

D
stmt  
dapan1121 已提交
407 408 409 410 411
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query column fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
stmt  
dapan1121 已提交
412
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
413
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
414
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
415 416 417
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
stmt  
dapan1121 已提交
418
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
D
stmt  
dapan1121 已提交
419 420 421 422

  return TSDB_CODE_SUCCESS;  
}

D
stmt  
dapan1121 已提交
423
int stmtBindBatch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int32_t colIdx) {
D
stmt  
dapan1121 已提交
424 425
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
426
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
D
stmt  
dapan1121 已提交
427

D
stmt  
dapan1121 已提交
428 429
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
430
  }
D
stmt  
dapan1121 已提交
431

D
stmt  
dapan1121 已提交
432 433
  if (NULL == pStmt->exec.pRequest) {
    STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
D
stmt  
dapan1121 已提交
434 435
  }

D
stmt  
dapan1121 已提交
436
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
437 438
    STMT_ERR_RET(stmtParseSql(pStmt));
  }
D
stmt  
dapan1121 已提交
439

D
stmt  
dapan1121 已提交
440
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
441
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
442
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
443 444
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }
D
stmt  
dapan1121 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461

  if (colIdx < 0) {
    qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
  } else {
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
      tscError("bind column index not in sequence");
      STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
    }

    pStmt->bInfo.sBindLastIdx = colIdx;
    
    if (0 == colIdx) {
      pStmt->bInfo.sBindRowNum = bind->num;
    }
    
    qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum);
  }
D
stmt  
dapan1121 已提交
462 463
  
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
464 465
}

D
stmt  
dapan1121 已提交
466 467 468 469

int stmtAddBatch(TAOS_STMT *stmt) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
470
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
D
stmt  
dapan1121 已提交
471

D
stmt  
dapan1121 已提交
472
  STMT_ERR_RET(stmtCacheBlock(pStmt));
D
stmt  
dapan1121 已提交
473
  
D
stmt  
dapan1121 已提交
474 475 476 477
  return TSDB_CODE_SUCCESS;
}

int stmtExec(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
478 479 480
  STscStmt* pStmt = (STscStmt*)stmt;
  int32_t code = 0;

D
stmt  
dapan1121 已提交
481
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
D
stmt  
dapan1121 已提交
482

D
stmt  
dapan1121 已提交
483
  STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
D
stmt  
dapan1121 已提交
484

D
stmt  
dapan1121 已提交
485
  launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true);
D
stmt  
dapan1121 已提交
486

D
stmt  
dapan1121 已提交
487
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
D
stmt  
dapan1121 已提交
488

D
dapan1121 已提交
489 490
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
  pStmt->affectedRows += pStmt->exec.affectedRows;
D
stmt  
dapan1121 已提交
491

D
stmt  
dapan1121 已提交
492 493
_return:

D
stmt  
dapan1121 已提交
494
  stmtCleanExecInfo(pStmt, (code ? false : true));
D
stmt  
dapan1121 已提交
495 496
  
  ++pStmt->sql.runTimes;
D
stmt  
dapan1121 已提交
497 498
  
  STMT_RET(code);
D
stmt  
dapan1121 已提交
499 500 501
}


D
stmt  
dapan1121 已提交
502
int stmtClose(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
503 504 505
  STscStmt* pStmt = (STscStmt*)stmt;

  STMT_RET(stmtCleanSQLInfo(pStmt));
D
stmt  
dapan1121 已提交
506 507
}

D
stmt  
dapan1121 已提交
508
const char *stmtErrstr(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
509
  STscStmt* pStmt = (STscStmt*)stmt;
D
stmt  
dapan1121 已提交
510

D
stmt  
dapan1121 已提交
511 512 513 514
  if (stmt == NULL) {
    return (char*) tstrerror(terrno);
  }

D
stmt  
dapan1121 已提交
515 516 517 518
  if (pStmt->exec.pRequest) {
    pStmt->exec.pRequest->code = terrno;
  }

D
stmt  
dapan1121 已提交
519
  return taos_errstr(pStmt->exec.pRequest);
D
stmt  
dapan1121 已提交
520 521
}

D
stmt  
dapan1121 已提交
522
int stmtAffectedRows(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
523
  return ((STscStmt*)stmt)->affectedRows;
D
stmt  
dapan1121 已提交
524 525 526
}

int stmtIsInsert(TAOS_STMT *stmt, int *insert) {
D
stmt  
dapan1121 已提交
527 528
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
529 530 531 532 533
  if (pStmt->sql.type) {
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
  } else {
    *insert = isInsertSql(pStmt->sql.sqlStr, 0);
  }
D
stmt  
dapan1121 已提交
534
  
D
stmt  
dapan1121 已提交
535 536 537 538
  return TSDB_CODE_SUCCESS;
}

int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
D
stmt  
dapan1121 已提交
539 540
  STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
  
D
stmt  
dapan1121 已提交
541 542 543 544 545 546 547 548 549
  return TSDB_CODE_SUCCESS;
}

TAOS_RES *stmtUseResult(TAOS_STMT *stmt) {
  return NULL;
}