提交 e1b1b314 编写于 作者: H Haojun Liao

[TD-225]fix bug in prepare stmt

上级 52aed440
......@@ -110,7 +110,7 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......
......@@ -235,7 +235,7 @@ typedef struct {
int32_t numOfTablesInSubmit;
};
uint32_t insertType;
uint32_t insertType; // TODO remove it
int32_t clauseIndex; // index of multiple subclause query
char * curSql; // current sql, resume position of sql after parsing paused
......
......@@ -1281,7 +1281,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
goto _clean;
}
}
......@@ -1336,15 +1336,6 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
}
if (tscIsInsertData(pSql->sqlstr)) {
/*
* Set the fp before parse the sql string, in case of getTableMeta failed, in which
* the error handle callback function can rightfully restore the user-defined callback function (fp).
*/
if (initial && (pSql->cmd.insertType != TSDB_QUERY_TYPE_STMT_INSERT)) {
pSql->fetchFp = pSql->fp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
}
if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
return ret;
}
......@@ -1398,7 +1389,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return tscInvalidSQLErrMsg(pCmd->payload, "too many rows in sql, total number of rows should be less than 32767", NULL);
}
if ((code = tscMergeTableDataBlocks(pSql)) != TSDB_CODE_SUCCESS) {
if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -255,7 +255,6 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
////////////////////////////////////////////////////////////////////////////////
// functions for insertion statement preparation
static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
if (bind->is_null != NULL && *(bind->is_null)) {
setNull(data + param->offset, param->type, param->bytes);
......@@ -697,71 +696,52 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
SSqlCmd* pCmd = &stmt->pSql->cmd;
int32_t alloced = 1, binded = 0;
if (pCmd->batchSize > 0) {
alloced = (pCmd->batchSize + 1) / 2;
binded = pCmd->batchSize / 2;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pCmd->pTableBlockHashList == NULL) {
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
size_t size = taosArrayGetSize(pCmd->pDataBlocks);
for (int32_t i = 0; i < size; ++i) {
STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
uint32_t dataSize = totalDataSize / alloced;
assert(dataSize * alloced == totalDataSize);
STableDataBlocks* pBlock = NULL;
int32_t ret =
tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
pTableMeta->tableInfo.rowSize, pTableMetaInfo->name, pTableMeta, &pBlock, NULL);
if (ret != 0) {
// todo handle error
}
if (alloced == binded) {
totalDataSize += dataSize + sizeof(SSubmitBlk);
uint32_t totalDataSize = sizeof(SSubmitBlk) + pCmd->batchSize * pBlock->rowSize;
if (totalDataSize > pBlock->nAllocSize) {
const double factor = 1.5;
void* tmp = realloc(pBlock->pData, (uint32_t)(totalDataSize * factor));
if (tmp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pBlock->pData = (char*)tmp;
pBlock->nAllocSize = (uint32_t)(totalDataSize * factor);
}
}
char* data = pBlock->pData + sizeof(SSubmitBlk) + dataSize * binded;
char* data = pBlock->pData + sizeof(SSubmitBlk) + pBlock->rowSize * pCmd->batchSize;
for (uint32_t j = 0; j < pBlock->numOfParams; ++j) {
SParamInfo* param = pBlock->params + j;
int code = doBindParam(data, param, bind + param->idx);
SParamInfo* param = &pBlock->params[j];
int code = doBindParam(data, param, &bind[param->idx]);
if (code != TSDB_CODE_SUCCESS) {
tscDebug("param %d: type mismatch or invalid", param->idx);
return code;
}
}
}
// actual work of all data blocks is done, update block size and numOfRows.
// note we don't do this block by block during the binding process, because
// we cannot recover if something goes wrong.
pCmd->batchSize = binded * 2 + 1;
if (binded < alloced) {
return TSDB_CODE_SUCCESS;
}
size_t total = taosArrayGetSize(pCmd->pDataBlocks);
for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
pBlock->size += totalDataSize / alloced;
SSubmitBlk* pSubmit = (SSubmitBlk*)pBlock->pData;
pSubmit->numOfRows += pSubmit->numOfRows / alloced;
}
return TSDB_CODE_SUCCESS;
}
static int insertStmtAddBatch(STscStmt* stmt) {
SSqlCmd* pCmd = &stmt->pSql->cmd;
if ((pCmd->batchSize % 2) == 1) {
++pCmd->batchSize;
}
return TSDB_CODE_SUCCESS;
}
......@@ -793,50 +773,66 @@ static int insertStmtExecute(STscStmt* stmt) {
if (pCmd->batchSize == 0) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
if ((pCmd->batchSize % 2) == 1) {
++pCmd->batchSize;
}
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
if (taosHashGetSize(pCmd->pTableBlockHashList) == 0) {
return TSDB_CODE_SUCCESS;
}
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
if (taosHashGetSize(pCmd->pTableBlockHashList) > 0) {
// merge according to vgid
int code = tscMergeTableDataBlocks(stmt->pSql);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pCmd->pTableBlockHashList == NULL) {
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
STableDataBlocks* pBlock = NULL;
int32_t ret =
tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
pTableMeta->tableInfo.rowSize, pTableMetaInfo->name, pTableMeta, &pBlock, NULL);
assert(ret == 0);
pBlock->size = sizeof(SSubmitBlk) + pCmd->batchSize * pBlock->rowSize;
SSubmitBlk* pBlk = (SSubmitBlk*) pBlock->pData;
pBlk->numOfRows = pCmd->batchSize;
pBlk->dataLen = 0;
pBlk->uid = pTableMeta->id.uid;
pBlk->tid = pTableMeta->id.tid;
int code = tscMergeTableDataBlocks(stmt->pSql, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
STableDataBlocks* pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
code = tscCopyDataBlockToPayload(stmt->pSql, pDataBlock);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// set the next sent data vnode index in data block arraylist
pTableMetaInfo->vgroupIndex = 1;
} else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
}
SSqlObj *pSql = stmt->pSql;
SSqlRes *pRes = &pSql->res;
SSqlObj* pSql = stmt->pSql;
SSqlRes* pRes = &pSql->res;
pRes->numOfRows = 0;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
pRes->qhandle = 0;
pSql->cmd.insertType = 0;
pSql->fetchFp = waitForQueryRsp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
tscDoQuery(pSql);
tscProcessSql(pSql);
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
return pSql->res.code;
// data block reset
pCmd->batchSize = 0;
for(int32_t i = 0; i < pCmd->numOfTables; ++i) {
if (pCmd->pTableNameList && pCmd->pTableNameList[i]) {
tfree(pCmd->pTableNameList[i]);
}
}
pCmd->numOfTables = 0;
tfree(pCmd->pTableNameList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
return pSql->res.code;
}
////////////////////////////////////////////////////////////////////////////////
......@@ -870,8 +866,8 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA;
pStmt->pSql = pSql;
return pStmt;
}
......@@ -890,7 +886,9 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
SSqlRes *pRes = &pSql->res;
pSql->param = (void*) pSql;
pSql->fp = waitForQueryRsp;
pSql->cmd.insertType = TSDB_QUERY_TYPE_STMT_INSERT;
pSql->fetchFp = waitForQueryRsp;
pCmd->insertType = TSDB_QUERY_TYPE_STMT_INSERT;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("%p failed to malloc payload buffer", pSql);
......@@ -956,8 +954,9 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt->isInsert) {
return insertStmtBindParam(pStmt, bind);
}
} else {
return normalStmtBindParam(pStmt, bind);
}
}
int taos_stmt_add_batch(TAOS_STMT* stmt) {
......@@ -981,7 +980,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt->isInsert) {
ret = insertStmtExecute(pStmt);
} else {
} else { // normal stmt query
char* sql = normalStmtBuildSql(pStmt);
if (sql == NULL) {
ret = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -995,6 +994,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
free(sql);
}
}
return ret;
}
......
......@@ -483,9 +483,9 @@ int tscProcessSql(SSqlObj *pSql) {
pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
return pSql->res.code;
}
} else if (pCmd->command < TSDB_SQL_LOCAL) {
} else if (pCmd->command >= TSDB_SQL_LOCAL) {
//pSql->epSet = tscMgmtEpSet;
} else { // local handler
// } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql);
}
......
......@@ -781,6 +781,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
int len = 0;
for (int i = 0; i < num_fields; ++i) {
if (i > 0) {
str[len++] = ' ';
......@@ -838,13 +839,15 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
size_t xlen = 0;
for (xlen = 0; xlen < fields[i].bytes - VARSTR_HEADER_SIZE; xlen++) {
char c = ((char *)row[i])[xlen];
if (c == 0) break;
str[len++] = c;
int32_t charLen = varDataLen(row[i] - VARSTR_HEADER_SIZE);
if (fields[i].type == TSDB_DATA_TYPE_BINARY) {
assert(charLen <= fields[i].bytes);
} else {
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE);
}
str[len] = 0;
memcpy(str + len, row[i], charLen);
len += charLen;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
......
......@@ -603,6 +603,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
assert(pCmd->numOfClause == 1);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
// todo refactor
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
tstrncpy(pTableMetaInfo->name, pDataBlock->tableName, sizeof(pTableMetaInfo->name));
......@@ -689,7 +690,6 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
if (t1 != NULL) {
*dataBlocks = *t1;
......@@ -785,9 +785,13 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
static void extractTableNameList(SSqlCmd* pCmd) {
static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) {
pCmd->numOfTables = (int32_t) taosHashGetSize(pCmd->pTableBlockHashList);
if (pCmd->pTableNameList == NULL) {
pCmd->pTableNameList = calloc(pCmd->numOfTables, POINTER_BYTES);
} else {
memset(pCmd->pTableNameList, 0, pCmd->numOfTables * POINTER_BYTES);
}
STableDataBlocks **p1 = taosHashIterate(pCmd->pTableBlockHashList, NULL);
int32_t i = 0;
......@@ -797,10 +801,12 @@ static void extractTableNameList(SSqlCmd* pCmd) {
p1 = taosHashIterate(pCmd->pTableBlockHashList, p1);
}
if (freeBlockMap) {
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
}
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
SSqlCmd* pCmd = &pSql->cmd;
......@@ -880,7 +886,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
pOneTableBlock = *p;
}
extractTableNameList(pCmd);
extractTableNameList(pCmd, freeBlockMap);
// free the table data blocks;
pCmd->pDataBlocks = pVnodeDataBlockList;
......@@ -2196,7 +2202,7 @@ void tscDoQuery(SSqlObj* pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
uint16_t type = pQueryInfo->type;
if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion
if (TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_INSERT)) { // multi-vnodes insertion
tscHandleMultivnodeInsert(pSql);
return;
}
......
......@@ -2,15 +2,117 @@
#include <iostream>
#include "taos.h"
#include "tglobal.h"
namespace {
static int64_t start_ts = 1433955661000;
void stmtInsertTest() {
TAOS* conn = taos_connect("ubuntu", "root", "taosdata", 0, 0);
if (conn == NULL) {
printf("Failed to connect to DB, reason:%s", taos_errstr(conn));
exit(-1);
}
TAOS_RES* res = taos_query(conn, "use test");
taos_free_result(res);
const char* sql = "insert into t1 values(?, ?, ?, ?)";
TAOS_STMT* stmt = taos_stmt_init(conn);
int32_t ret = taos_stmt_prepare(stmt, sql, 0);
ASSERT_EQ(ret, 0);
//ts timestamp, k int, a binary(11), b nchar(4)
struct {
int64_t ts;
int k;
char* a;
char* b;
} v = {0};
TAOS_BIND params[4];
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer_length = sizeof(v.ts);
params[0].buffer = &v.ts;
params[0].length = &params[0].buffer_length;
params[0].is_null = NULL;
params[1].buffer_type = TSDB_DATA_TYPE_INT;
params[1].buffer_length = sizeof(v.k);
params[1].buffer = &v.k;
params[1].length = &params[1].buffer_length;
params[1].is_null = NULL;
params[2].buffer_type = TSDB_DATA_TYPE_BINARY;
params[2].buffer_length = sizeof(v.a);
params[2].buffer = &v.a;
params[2].is_null = NULL;
params[3].buffer_type = TSDB_DATA_TYPE_NCHAR;
params[3].buffer_length = sizeof(v.b);
params[3].buffer = &v.b;
params[3].is_null = NULL;
v.ts = start_ts + 20;
v.k = 123;
char* str = "abc";
uintptr_t len = strlen(str);
v.a = str;
params[2].length = &len;
params[2].buffer_length = len;
params[2].buffer = str;
char* nstr = "999";
uintptr_t len1 = strlen(nstr);
v.b = nstr;
params[3].buffer_length = len1;
params[3].buffer = nstr;
params[3].length = &len1;
taos_stmt_bind_param(stmt, params);
taos_stmt_add_batch(stmt);
if (taos_stmt_execute(stmt) != 0) {
printf("\033[31mfailed to execute insert statement.\033[0m\n");
return;
}
v.ts = start_ts + 30;
v.k = 911;
str = "92";
len = strlen(str);
params[2].length = &len;
params[2].buffer_length = len;
params[2].buffer = str;
nstr = "1920";
len1 = strlen(nstr);
params[3].buffer_length = len1;
params[3].buffer = nstr;
params[3].length = &len1;
taos_stmt_bind_param(stmt, params);
taos_stmt_add_batch(stmt);
ret = taos_stmt_execute(stmt);
if (ret != 0) {
printf("%p\n", ret);
printf("\033[31mfailed to execute insert statement.\033[0m\n");
return;
}
taos_stmt_close(stmt);
taos_close(conn);
}
/* test parse time function */
TEST(testCase, result_field_test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
taos_init();
void validateResultFields() {
TAOS* conn = taos_connect("ubuntu", "root", "taosdata", 0, 0);
if (conn == NULL) {
printf("Failed to connect to DB, reason:%s", taos_errstr(conn));
......@@ -134,5 +236,31 @@ TEST(testCase, result_field_test) {
ASSERT_STREQ(fields[6].name, "first(ts)");
taos_free_result(res);
// update the configure parameter, the result field name will be changed
tsKeepOriginalColumnName = 1;
res = taos_query(conn, "select first(ts, a, k, k, b, b, ts) from t1");
ASSERT_EQ(taos_num_fields(res), 7);
fields = taos_fetch_fields(res);
ASSERT_EQ(fields[0].bytes, 8);
ASSERT_EQ(fields[0].type, TSDB_DATA_TYPE_TIMESTAMP);
ASSERT_STREQ(fields[0].name, "ts");
ASSERT_EQ(fields[2].bytes, 4);
ASSERT_EQ(fields[2].type, TSDB_DATA_TYPE_INT);
ASSERT_STREQ(fields[2].name, "k");
taos_free_result(res);
taos_close(conn);
}
}
/* test parse time function */
TEST(testCase, result_field_test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
taos_init();
validateResultFields();
stmtInsertTest();
}
......@@ -93,15 +93,15 @@ void Test(TAOS *taos, char *qstr, int index) {
// if (taos_query(taos, qstr)) {
// printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
// }
TAOS_RES *result = taos_query(taos, qstr);
if (result) {
TAOS_RES *result1 = taos_query(taos, qstr);
if (result1) {
printf("insert row: %i\n", i);
} else {
printf("failed to insert row: %i, reason:%s\n", i, "null result"/*taos_errstr(result)*/);
taos_free_result(result);
taos_free_result(result1);
exit(1);
}
taos_free_result(result);
taos_free_result(result1);
}
printf("success to insert rows, total %d rows\n", i);
......
......@@ -111,7 +111,8 @@ if $rows != 1 then
return -1
endi
if $data00 != 6.000000000 then
if $data00 != NULL then
print expect NULL, actual:$data00
return -1
endi
......@@ -167,7 +168,7 @@ if $data01 != 4 then
return -1
endi
// todo insert more rows and chec it
## todo insert more rows and chec it
sql select first(a),count(b),last(c),sum(b),spread(d),avg(c),min(b),max(a),stddev(a) from mt_unsigned_1;
if $rows != 1 then
return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册