未验证 提交 0d4913e0 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #6265 from taosdata/feature/TD-4385

[TD-4385]prepare with auto create table
......@@ -26,7 +26,7 @@
| TSDB_CODE_COM_OUT_OF_MEMORY | 0 | 0x0102 | "Out of memory" | -2147483390 |
| TSDB_CODE_COM_INVALID_CFG_MSG | 0 | 0x0103 | "Invalid config message" | -2147483389 |
| TSDB_CODE_COM_FILE_CORRUPTED | 0 | 0x0104 | "Data file corrupted" | -2147483388 |
| TSDB_CODE_TSC_INVALID_SQL | 0 | 0x0200 | "Invalid SQL statement" | -2147483136 |
| TSDB_CODE_TSC_INVALID_OPERATION | 0 | 0x0200 | "Invalid SQL statement" | -2147483136 |
| TSDB_CODE_TSC_INVALID_QHANDLE | 0 | 0x0201 | "Invalid qhandle" | -2147483135 |
| TSDB_CODE_TSC_INVALID_TIME_STAMP | 0 | 0x0202 | "Invalid combination of client/service time" | -2147483134 |
| TSDB_CODE_TSC_INVALID_VALUE | 0 | 0x0203 | "Invalid value in client" | -2147483133 |
......
......@@ -94,6 +94,8 @@ typedef struct SVgroupTableInfo {
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
......
......@@ -218,11 +218,19 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: executeBatchImp
* Method: closeStmt
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/**
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
#ifdef __cplusplus
}
#endif
......
......@@ -749,7 +749,6 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
}
jniDebug("jobj:%p, conn:%p, set stmt bind table name:%s", jobj, tsconn, name);
(*env)->ReleaseStringUTFChars(env, jname, name);
return JNI_SUCCESS;
}
......@@ -762,7 +761,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return JNI_SQL_NULL;
......@@ -777,14 +776,14 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
}
len = (*env)->GetArrayLength(env, lengthList);
char *lengthArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
char *lengthArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
char *nullArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
char *nullArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
}
......@@ -799,22 +798,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
b->length = (int32_t*)lengthArray;
// set the length and is_null array
switch(dataType) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
int32_t bytes = tDataTypes[dataType].bytes;
for(int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
}
break;
}
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY: {
// do nothing
if (!IS_VAR_DATA_TYPE(dataType)) {
int32_t bytes = tDataTypes[dataType].bytes;
for (int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
}
}
......@@ -878,3 +865,74 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(JNIEnv *env, jobject jobj,
jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, tags);
char *tagsData = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte*) typeArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
if ((*env)->ExceptionCheck(env)) {
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char* curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for(int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t*) &lengthArray[i];
curTags += lengthArray[i];
}
int32_t code = taos_stmt_set_tbname_tags((void*)stmt, name, tagsBind);
int32_t nTags = (int32_t) numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
}
......@@ -46,9 +46,13 @@ typedef struct SNormalStmt {
typedef struct SMultiTbStmt {
bool nameSet;
bool tagSet;
uint64_t currentUid;
uint32_t tbNum;
SStrToken tbname;
SStrToken stbname;
SStrToken values;
SArray *tags;
SHashObj *pTableHash;
SHashObj *pTableBlockHashList; // data block for each table
} SMultiTbStmt;
......@@ -1199,6 +1203,184 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
return pStmt->pSql->res.code;
}
int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
SSqlCmd *pCmd = &pSql->cmd;
int32_t ret = TSDB_CODE_SUCCESS;
if ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS) {
return ret;
}
int32_t index = 0;
SStrToken sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n == 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (sToken.n == 1 && sToken.type == TK_QUESTION) {
pStmt->multiTbInsert = true;
pStmt->mtb.tbname = sToken;
pStmt->mtb.nameSet = false;
if (pStmt->mtb.pTableHash == NULL) {
pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
}
if (pStmt->mtb.pTableBlockHashList == NULL) {
pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
pStmt->mtb.tagSet = true;
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n > 0 && sToken.type == TK_VALUES) {
return TSDB_CODE_SUCCESS;
}
if (sToken.n <= 0 || sToken.type != TK_USING) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0 || ((sToken.type != TK_ID) && (sToken.type != TK_STRING))) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
pStmt->mtb.stbname = sToken;
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_TAGS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_LP) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
pStmt->mtb.tags = taosArrayInit(4, sizeof(SStrToken));
int32_t loopCont = 1;
while (loopCont) {
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
switch (sToken.type) {
case TK_RP:
loopCont = 0;
break;
case TK_VALUES:
return TSDB_CODE_TSC_INVALID_OPERATION;
case TK_QUESTION:
pStmt->mtb.tagSet = false; //continue
default:
taosArrayPush(pStmt->mtb.tags, &sToken);
break;
}
}
if (taosArrayGetSize(pStmt->mtb.tags) <= 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_VALUES) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
pStmt->mtb.values = sToken;
}
return TSDB_CODE_SUCCESS;
}
int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAOS_BIND* tags) {
size_t tagNum = taosArrayGetSize(pStmt->mtb.tags);
size_t size = 1048576;
char *str = calloc(1, size);
size_t len = 0;
int32_t ret = 0;
int32_t j = 0;
while (1) {
len = (size_t)snprintf(str, size - 1, "insert into %s using %.*s tags(", name, pStmt->mtb.stbname.n, pStmt->mtb.stbname.z);
if (len >= (size -1)) {
size *= 2;
free(str);
str = calloc(1, size);
continue;
}
j = 0;
for (size_t i = 0; i < tagNum && len < (size - 1); ++i) {
SStrToken *t = taosArrayGet(pStmt->mtb.tags, i);
if (t->type == TK_QUESTION) {
int32_t l = 0;
if (i > 0) {
str[len++] = ',';
}
if (tags[j].is_null && (*tags[j].is_null)) {
ret = converToStr(str + len, TSDB_DATA_TYPE_NULL, NULL, -1, &l);
} else {
if (tags[j].buffer == NULL) {
free(str);
tscError("empty");
return TSDB_CODE_TSC_APP_ERROR;
}
ret = converToStr(str + len, tags[j].buffer_type, tags[j].buffer, tags[j].length ? (int32_t)*tags[j].length : -1, &l);
}
++j;
if (ret) {
free(str);
return ret;
}
len += l;
} else {
len += (size_t)snprintf(str + len, size - len - 1, i > 0 ? ",%.*s" : "%.*s", t->n, t->z);
}
}
if (len >= (size - 1)) {
size *= 2;
free(str);
str = calloc(1, size);
continue;
}
strcat(str, ") ");
len += 2;
if ((len + strlen(pStmt->mtb.values.z)) >= (size - 1)) {
size *= 2;
free(str);
str = calloc(1, size);
continue;
}
strcat(str, pStmt->mtb.values.z);
break;
}
free(pSql->sqlstr);
pSql->sqlstr = str;
return TSDB_CODE_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////
// interface functions
......@@ -1291,34 +1473,15 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
registerSqlObj(pSql);
int32_t ret = TSDB_CODE_SUCCESS;
if ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS) {
int32_t ret = stmtParseInsertTbTags(pSql, pStmt);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
int32_t index = 0;
SStrToken sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n == 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (sToken.n == 1 && sToken.type == TK_QUESTION) {
pStmt->multiTbInsert = true;
pStmt->mtb.tbname = sToken;
pStmt->mtb.nameSet = false;
if (pStmt->mtb.pTableHash == NULL) {
pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
}
if (pStmt->mtb.pTableBlockHashList == NULL) {
pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
if (pStmt->multiTbInsert) {
return TSDB_CODE_SUCCESS;
}
pStmt->multiTbInsert = false;
memset(&pStmt->mtb, 0, sizeof(pStmt->mtb));
int32_t code = tsParseSql(pSql, true);
......@@ -1335,7 +1498,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
return normalStmtPrepare(pStmt);
}
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags) {
STscStmt* pStmt = (STscStmt*)stmt;
SSqlObj* pSql = pStmt->pSql;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1383,8 +1546,22 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
return TSDB_CODE_SUCCESS;
}
pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name);
if (pStmt->mtb.tagSet) {
pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name);
} else {
if (tags == NULL) {
tscError("No tags set");
return TSDB_CODE_TSC_APP_ERROR;
}
int32_t ret = stmtGenInsertStatement(pSql, pStmt, name, tags);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
}
pStmt->mtb.nameSet = true;
pStmt->mtb.tagSet = true;
tscDebug("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
......@@ -1432,6 +1609,12 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
return code;
}
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
int taos_stmt_close(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (!pStmt->isInsert) {
......@@ -1449,7 +1632,8 @@ int taos_stmt_close(TAOS_STMT* stmt) {
taosHashCleanup(pStmt->mtb.pTableHash);
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, true);
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
pStmt->pSql->cmd.insertParam.pTableNameList = NULL;
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
taosArrayDestroy(pStmt->mtb.tags);
}
}
......
......@@ -32,6 +32,67 @@
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len) {
int32_t n = 0;
switch (type) {
case TSDB_DATA_TYPE_NULL:
n = sprintf(str, "null");
break;
case TSDB_DATA_TYPE_BOOL:
n = sprintf(str, (*(int8_t*)buf) ? "true" : "false");
break;
case TSDB_DATA_TYPE_TINYINT:
n = sprintf(str, "%d", *(int8_t*)buf);
break;
case TSDB_DATA_TYPE_SMALLINT:
n = sprintf(str, "%d", *(int16_t*)buf);
break;
case TSDB_DATA_TYPE_INT:
n = sprintf(str, "%d", *(int32_t*)buf);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
n = sprintf(str, "%" PRId64, *(int64_t*)buf);
break;
case TSDB_DATA_TYPE_FLOAT:
n = sprintf(str, "%f", GET_FLOAT_VAL(buf));
break;
case TSDB_DATA_TYPE_DOUBLE:
n = sprintf(str, "%f", GET_DOUBLE_VAL(buf));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (bufSize < 0) {
tscError("invalid buf size");
return TSDB_CODE_TSC_INVALID_VALUE;
}
*str = '"';
memcpy(str + 1, buf, bufSize);
*(str + bufSize + 1) = '"';
n = bufSize + 2;
break;
default:
tscError("unsupported type:%d", type);
return TSDB_CODE_TSC_INVALID_VALUE;
}
*len = n;
return TSDB_CODE_SUCCESS;
}
static void tscStrToLower(char *str, int32_t n) {
if (str == NULL || n <= 0) { return;}
for (int32_t i = 0; i < n; i++) {
......
Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f
Subproject commit 8ce6d86558afc8c0b50c10f990fd2b4270cf06fc
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa
......@@ -310,6 +310,16 @@ public class TSDBJNIConnector {
private native int setBindTableNameImp(long stmt, String name, long conn);
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(),
nullList.array(), this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags");
}
}
private native int setTableNameTagsImp(long stmt, String name, int numOfTags, byte[] tags, byte[] typeList, byte[] lengthList, byte[] nullList, long conn);
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows,int columnIndex) throws SQLException {
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
......
......@@ -41,6 +41,9 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
private boolean isPrepared;
private ArrayList<ColumnInfo> colData;
private ArrayList<TableTagInfo> tableTags;
private int tagValueLength;
private String tableName;
private long nativeStmtHandle = 0;
......@@ -63,8 +66,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (parameterCnt > 1) {
// the table name is also a parameter, so ignore it.
this.colData = new ArrayList<ColumnInfo>(parameterCnt - 1);
this.colData.addAll(Collections.nCopies(parameterCnt - 1, null));
this.colData = new ArrayList<ColumnInfo>();
this.tableTags = new ArrayList<TableTagInfo>();
}
}
......@@ -562,11 +565,109 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
}
};
private static class TableTagInfo {
private boolean isNull;
private Object value;
private int type;
public TableTagInfo(Object value, int type) {
this.value = value;
this.type = type;
}
public static TableTagInfo createNullTag(int type) {
TableTagInfo info = new TableTagInfo(null, type);
info.isNull = true;
return info;
}
};
public void setTableName(String name) {
this.tableName = name;
}
private void ensureTagCapacity(int index) {
if (this.tableTags.size() < index + 1) {
int delta = index + 1 - this.tableTags.size();
this.tableTags.addAll(Collections.nCopies(delta, null));
}
}
public void setTagNull(int index, int type) {
ensureTagCapacity(index);
this.tableTags.set(index, TableTagInfo.createNullTag(type));
}
public void setTagBoolean(int index, boolean value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_BOOL));
this.tagValueLength += Byte.BYTES;
}
public void setTagInt(int index, int value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_INT));
this.tagValueLength += Integer.BYTES;
}
public void setTagByte(int index, byte value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_TINYINT));
this.tagValueLength += Byte.BYTES;
}
public void setTagShort(int index, short value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_SMALLINT));
this.tagValueLength += Short.BYTES;
}
public void setTagLong(int index, long value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_BIGINT));
this.tagValueLength += Long.BYTES;
}
public void setTagTimestamp(int index, long value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP));
this.tagValueLength += Long.BYTES;
}
public void setTagFloat(int index, float value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_FLOAT));
this.tagValueLength += Float.BYTES;
}
public void setTagDouble(int index, double value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_DOUBLE));
this.tagValueLength += Double.BYTES;
}
public void setTagString(int index, String value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_BINARY));
this.tagValueLength += value.getBytes().length;
}
public void setTagNString(int index, String value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_NCHAR));
String charset = TaosGlobalConfig.getCharset();
try {
this.tagValueLength += value.getBytes(charset).length;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public <T> void setValueImpl(int columnIndex, ArrayList<T> list, int type, int bytes) throws SQLException {
if (this.colData.size() == 0) {
this.colData.addAll(Collections.nCopies(this.parameters.length - 1 - this.tableTags.size(), null));
}
ColumnInfo col = (ColumnInfo) this.colData.get(columnIndex);
if (col == null) {
ColumnInfo p = new ColumnInfo();
......@@ -641,7 +742,122 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtHandle = connector.prepareStmt(rawSql);
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
if (this.tableTags == null) {
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
} else {
int num = this.tableTags.size();
ByteBuffer tagDataList = ByteBuffer.allocate(this.tagValueLength);
tagDataList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer typeList = ByteBuffer.allocate(num);
typeList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer lengthList = ByteBuffer.allocate(num * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer isNullList = ByteBuffer.allocate(num * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < num; ++i) {
TableTagInfo tag = this.tableTags.get(i);
if (tag.isNull) {
typeList.put((byte) tag.type);
isNullList.putInt(1);
lengthList.putLong(0);
continue;
}
switch (tag.type) {
case TSDBConstants.TSDB_DATA_TYPE_INT: {
Integer val = (Integer) tag.value;
tagDataList.putInt(val);
lengthList.putLong(Integer.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TINYINT: {
Byte val = (Byte) tag.value;
tagDataList.put(val);
lengthList.putLong(Byte.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
Boolean val = (Boolean) tag.value;
tagDataList.put((byte) (val ? 1 : 0));
lengthList.putLong(Byte.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
Short val = (Short) tag.value;
tagDataList.putShort(val);
lengthList.putLong(Short.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
Long val = (Long) tag.value;
tagDataList.putLong(val == null ? 0 : val);
lengthList.putLong(Long.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
Float val = (Float) tag.value;
tagDataList.putFloat(val == null ? 0 : val);
lengthList.putLong(Float.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
Double val = (Double) tag.value;
tagDataList.putDouble(val == null ? 0 : val);
lengthList.putLong(Double.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
String charset = TaosGlobalConfig.getCharset();
String val = (String) tag.value;
byte[] b = null;
try {
if (tag.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) {
b = val.getBytes();
} else {
b = val.getBytes(charset);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
tagDataList.put(b);
lengthList.putLong(b.length);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_UINT:
case TSDBConstants.TSDB_DATA_TYPE_UBIGINT: {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types");
}
}
typeList.put((byte) tag.type);
isNullList.putInt(tag.isNull? 1 : 0);
}
connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(), tagDataList,
typeList, lengthList, isNullList);
}
ColumnInfo colInfo = (ColumnInfo) this.colData.get(0);
if (colInfo == null) {
......
......@@ -112,6 +112,7 @@ typedef struct TAOS_MULTI_BIND {
TAOS_STMT *taos_stmt_init(TAOS *taos);
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
......
......@@ -74,7 +74,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010A) //"Ref is not there")
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid SQL statement")
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation")
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle")
#define TSDB_CODE_TSC_INVALID_TIME_STAMP TAOS_DEF_ERROR_CODE(0, 0x0202) //"Invalid combination of client/service time")
#define TSDB_CODE_TSC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x0203) //"Invalid value in client")
......
......@@ -2860,7 +2860,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
if (num != pInfo->numOfTables + pInfo->numOfVgroups) {
mError("msg:%p, app:%p, failed to get multi-tableMeta, msg inconsistent", pMsg, pMsg->rpcMsg.ahandle);
code = TSDB_CODE_MND_INVALID_TABLE_NAME;
goto _error;
goto _end;
}
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
......@@ -2868,7 +2868,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMultiMeta = rpcMallocCont(totalMallocLen);
if (pMultiMeta == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _error;
goto _end;
}
pMultiMeta->contLen = sizeof(SMultiTableMeta);
......@@ -2878,12 +2878,11 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
for (; t < pInfo->numOfTables; ++t) {
char *fullName = nameList[t];
pMsg->pVgroup = NULL;
pMsg->pTable = mnodeGetTable(fullName);
if (pMsg->pTable == NULL) {
mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle, fullName);
code = TSDB_CODE_MND_INVALID_TABLE_NAME;
goto _error;
goto _end;
}
if (pMsg->pDb == NULL) {
......@@ -2893,7 +2892,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mnodeDecTableRef(pMsg->pTable);
code = TSDB_CODE_APP_NOT_READY;
goto _error;
goto _end;
}
int remain = totalMallocLen - pMultiMeta->contLen;
......@@ -2903,7 +2902,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
if (pMultiMeta == NULL) {
mnodeDecTableRef(pMsg->pTable);
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _error;
goto _end;
}
}
......@@ -2911,19 +2910,26 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
code = mnodeDoGetSuperTableMeta(pMsg, pMeta);
taosArrayPush(pList, &fullName);// keep the full name for each super table for retrieve vgroup list
taosArrayPush(pList, &fullName); // keep the full name for each super table for retrieve vgroup list
} else {
code = mnodeDoGetChildTableMeta(pMsg, pMeta);
code = mnodeDoGetChildTableMeta(pMsg, pMeta);
if (pMsg->pVgroup != NULL) {
mnodeDecVgroupRef(pMsg->pVgroup);
pMsg->pVgroup = NULL;
}
}
mnodeDecTableRef(pMsg->pTable);
pMsg->pTable = NULL;
if (code == TSDB_CODE_SUCCESS) {
pMultiMeta->numOfTables ++;
pMultiMeta->numOfTables++;
pMultiMeta->contLen += pMeta->contLen;
} else {
// ignore error and continue.
// Otherwise the client may found that the responding message is inconsistent.
// goto _end;
}
mnodeDecTableRef(pMsg->pTable);
assert(((SCTableObj*)pMsg->pTable)->refCount >= 1);
pMsg->pTable = NULL;
}
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
......@@ -2944,7 +2950,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
if (pTable == NULL) {
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, name);
code = TSDB_CODE_MND_INVALID_TABLE_NAME;
goto _error;
goto _end;
}
msg = serializeVgroupInfo(pTable, name, msg, pMsg, pMsg->rpcMsg.ahandle);
......@@ -2955,15 +2961,18 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
pMsg->rpcRsp.rsp = pMultiMeta;
pMsg->rpcRsp.len = pMultiMeta->contLen;
code = TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
_error:
_end:
tfree(str);
tfree(nameList);
rpcFreeCont(pMultiMeta);
taosArrayDestroy(pList);
pMsg->pTable = NULL;
pMsg->pTable = NULL;
pMsg->pVgroup = NULL;
if (code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pMultiMeta);
}
return code;
}
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册