未验证 提交 a0e9491a 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4284 from taosdata/feature/query

Feature/query
......@@ -39,9 +39,9 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql);
TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult);
TAOS_ROW doSetResultRowData(SSqlObj *pSql);
char *getArithemicInputSrc(void *param, const char *name, int32_t colId);
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId);
#ifdef __cplusplus
}
......
......@@ -313,6 +313,7 @@ typedef struct {
SResRec * pGroupRec;
char * data;
TAOS_ROW tsrow;
TAOS_ROW urow;
int32_t* length; // length for each field for current row
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex * pColumnIndex;
......@@ -425,6 +426,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
......@@ -471,8 +473,9 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
int32_t bytes = pInfo->field.bytes;
char* pData = pRes->data + (int32_t)(offset * pRes->numOfRows + bytes * pRes->row);
UNUSED(pData);
// user defined constant value output columns
// user defined constant value output columns
if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
pData = pInfo->pSqlExpr->param[1].pz;
......
......@@ -129,6 +129,14 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp
(JNIEnv *, jobject, jlong, jlong, jobject);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: fetchBlockImp
* Signature: (JJLcom/taosdata/jdbc/TSDBResultSetBlockData;)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp
(JNIEnv *, jobject, jlong, jlong, jobject);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: closeConnectionImp
......
......@@ -17,7 +17,6 @@
#include "taos.h"
#include "tlog.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "com_taosdata_jdbc_TSDBJNIConnector.h"
......@@ -57,6 +56,10 @@ jmethodID g_rowdataSetStringFp;
jmethodID g_rowdataSetTimestampFp;
jmethodID g_rowdataSetByteArrayFp;
jmethodID g_blockdataSetByteArrayFp;
jmethodID g_blockdataSetNumOfRowsFp;
jmethodID g_blockdataSetNumOfColsFp;
#define JNI_SUCCESS 0
#define JNI_TDENGINE_ERROR -1
#define JNI_CONNECTION_NULL -2
......@@ -66,7 +69,7 @@ jmethodID g_rowdataSetByteArrayFp;
#define JNI_FETCH_END -6
#define JNI_OUT_OF_MEMORY -7
void jniGetGlobalMethod(JNIEnv *env) {
static void jniGetGlobalMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init, 0, 1)) {
case 0:
......@@ -114,10 +117,31 @@ void jniGetGlobalMethod(JNIEnv *env) {
g_rowdataSetByteArrayFp = (*env)->GetMethodID(env, g_rowdataClass, "setByteArray", "(I[B)V");
(*env)->DeleteLocalRef(env, rowdataClass);
jclass blockdataClass = (*env)->FindClass(env, "com/taosdata/jdbc/TSDBResultSetBlockData");
jclass g_blockdataClass = (*env)->NewGlobalRef(env, blockdataClass);
g_blockdataSetByteArrayFp = (*env)->GetMethodID(env, g_blockdataClass, "setByteArray", "(II[B)V");
g_blockdataSetNumOfRowsFp = (*env)->GetMethodID(env, g_blockdataClass, "setNumOfRows", "(I)V");
g_blockdataSetNumOfColsFp = (*env)->GetMethodID(env, g_blockdataClass, "setNumOfCols", "(I)V");
(*env)->DeleteLocalRef(env, blockdataClass);
atomic_store_32(&__init, 2);
jniDebug("native method register finished");
}
static int32_t check_for_params(jobject jobj, jlong conn, jlong res) {
if ((TAOS*) conn == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL;
}
if ((TAOS_RES *) res == NULL) {
jniError("jobj:%p, conn:%p, res is null", jobj, (TAOS*) conn);
return JNI_RESULT_SET_NULL;
}
return JNI_SUCCESS;
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp(JNIEnv *env, jobject jobj, jint jMode,
jstring jPath, jboolean jAutoDump) {
if (jPath != NULL) {
......@@ -192,39 +216,37 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setOptions(JNIEnv
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEnv *env, jobject jobj, jstring jhost,
jint jport, jstring jdbName, jstring juser,
jstring jpass) {
jlong ret = 0;
jlong ret = 0;
const char *host = NULL;
const char *dbname = NULL;
const char *user = NULL;
const char *pass = NULL;
const char *dbname = NULL;
if (jhost != NULL) {
host = (*env)->GetStringUTFChars(env, jhost, NULL);
}
if (jdbName != NULL) {
dbname = (*env)->GetStringUTFChars(env, jdbName, NULL);
}
if (juser != NULL) {
user = (*env)->GetStringUTFChars(env, juser, NULL);
}
if (jpass != NULL) {
pass = (*env)->GetStringUTFChars(env, jpass, NULL);
}
if (user == NULL) {
jniDebug("jobj:%p, user is null, use default user %s", jobj, TSDB_DEFAULT_USER);
jniDebug("jobj:%p, user not specified, use default user %s", jobj, TSDB_DEFAULT_USER);
}
if (pass == NULL) {
jniDebug("jobj:%p, pass is null, use default password", jobj);
jniDebug("jobj:%p, pass not specified, use default password", jobj);
}
/*
* set numOfThreadsPerCore = 0
* means only one thread for client side scheduler
*/
tsNumOfThreadsPerCore = 0.0;
ret = (jlong)taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport);
ret = (jlong) taos_connect((char *)host, (char *)user, (char *)pass, (char *)dbname, (uint16_t)jport);
if (ret == 0) {
jniError("jobj:%p, conn:%p, connect to database failed, host=%s, user=%s, dbname=%s, port=%d", jobj, (void *)ret,
(char *)host, (char *)user, (char *)dbname, (int32_t)jport);
......@@ -233,10 +255,21 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp(JNIEn
(char *)host, (char *)user, (char *)dbname, (int32_t)jport);
}
if (host != NULL) (*env)->ReleaseStringUTFChars(env, jhost, host);
if (dbname != NULL) (*env)->ReleaseStringUTFChars(env, jdbName, dbname);
if (user != NULL) (*env)->ReleaseStringUTFChars(env, juser, user);
if (pass != NULL) (*env)->ReleaseStringUTFChars(env, jpass, pass);
if (host != NULL) {
(*env)->ReleaseStringUTFChars(env, jhost, host);
}
if (dbname != NULL) {
(*env)->ReleaseStringUTFChars(env, jdbName, dbname);
}
if (user != NULL) {
(*env)->ReleaseStringUTFChars(env, juser, user);
}
if (pass != NULL) {
(*env)->ReleaseStringUTFChars(env, jpass, pass);
}
return ret;
}
......@@ -245,64 +278,53 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(
jbyteArray jsql, jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection is already closed", jobj);
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
if (jsql == NULL) {
jniError("jobj:%p, conn:%p, sql is null", jobj, tscon);
jniError("jobj:%p, conn:%p, empty sql string", jobj, tscon);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, jsql);
char *dst = (char *)calloc(1, sizeof(char) * (len + 1));
if (dst == NULL) {
jniError("jobj:%p, conn:%p, can not alloc memory", jobj, tscon);
char *str = (char *) calloc(1, sizeof(char) * (len + 1));
if (str == NULL) {
jniError("jobj:%p, conn:%p, alloc memory failed", jobj, tscon);
return JNI_OUT_OF_MEMORY;
}
(*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst);
(*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)str);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
jniDebug("jobj:%p, conn:%p, sql:%s", jobj, tscon, dst);
SSqlObj *pSql = taos_query(tscon, dst);
SSqlObj *pSql = taos_query(tscon, str);
int32_t code = taos_errno(pSql);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(code), taos_errstr(pSql));
} else {
int32_t affectRows = 0;
if (pSql->cmd.command == TSDB_SQL_INSERT) {
affectRows = taos_affected_rows(pSql);
int32_t affectRows = taos_affected_rows(pSql);
jniDebug("jobj:%p, conn:%p, code:%s, affect rows:%d", jobj, tscon, tstrerror(code), affectRows);
} else {
jniDebug("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
}
}
free(dst);
return (jlong)pSql;
free(str);
return (jlong) pSql;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp(JNIEnv *env, jobject jobj, jlong con, jlong tres) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return (jint)TSDB_CODE_TSC_INVALID_CONNECTION;
int32_t code = check_for_params(jobj, con, tres);
if (code != JNI_SUCCESS) {
return code;
}
if ((void *)tres == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
}
TAOS_RES *pSql = (TAOS_RES *)tres;
return (jint)taos_errno(pSql);
return (jint)taos_errno((TAOS_RES*) tres);
}
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(JNIEnv *env, jobject jobj, jlong tres) {
......@@ -313,23 +335,16 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL;
}
if ((void *)tres == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
int32_t code = check_for_params(jobj, con, tres);
if (code != JNI_SUCCESS) {
return code;
}
SSqlObj *pSql = (TAOS_RES *)tres;
STscObj *pObj = pSql->pTscObj;
if (tscIsUpdateQuery(pSql)) {
jniDebug("jobj:%p, conn:%p, update query, no resultset, %p", jobj, pObj, (void *)tres);
jniDebug("jobj:%p, conn:%p, update query, no resultset, %p", jobj, tscon, (void *)tres);
} else {
jniDebug("jobj:%p, conn:%p, get resultset, %p", jobj, pObj, (void *)tres);
jniDebug("jobj:%p, conn:%p, get resultset, %p", jobj, tscon, (void *)tres);
}
return tres;
......@@ -337,15 +352,9 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp(JNIEnv *env, jobject jobj, jlong con,
jlong tres) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL;
}
if ((void *)tres == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
int32_t code = check_for_params(jobj, con, tres);
if (code != JNI_SUCCESS) {
return code;
}
SSqlObj *pSql = (TAOS_RES *)tres;
......@@ -355,37 +364,27 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp(
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp(JNIEnv *env, jobject jobj, jlong con,
jlong res) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL;
}
if ((void *)res == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) {
return code;
}
taos_free_result((void *)res);
jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, tscon, (void *)res);
jniDebug("jobj:%p, conn:%p, free resultset:%p", jobj, (TAOS*) con, (void *)res);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp(JNIEnv *env, jobject jobj, jlong con,
jlong res) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL;
}
if ((void *)res == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) {
return code;
}
jint ret = taos_affected_rows((SSqlObj *)res);
jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (void *)con, (void *)res, (int32_t)ret);
jniDebug("jobj:%p, conn:%p, sql:%p, res: %p, affect rows:%d", jobj, tscon, (TAOS *)con, (TAOS_RES *)res, (int32_t)ret);
return ret;
}
......@@ -394,27 +393,20 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaData
jlong con, jlong res,
jobject arrayListObj) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_RES *result = (TAOS_RES *)res;
if (result == NULL) {
jniError("jobj:%p, conn:%p, resultset is null", jobj, tscon);
return JNI_RESULT_SET_NULL;
int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) {
return code;
}
TAOS_FIELD *fields = taos_fetch_fields(result);
int num_fields = taos_num_fields(result);
// jobject arrayListObj = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp, "");
TAOS_RES* tres = (TAOS_RES*) res;
TAOS_FIELD *fields = taos_fetch_fields(tres);
int32_t num_fields = taos_num_fields(tres);
if (num_fields == 0) {
jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void *)res, num_fields);
jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, tres, num_fields);
return JNI_NUM_OF_FIELDS_0;
} else {
jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void *)res, num_fields);
jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, tres, num_fields);
for (int i = 0; i < num_fields; ++i) {
jobject metadataObj = (*env)->NewObject(env, g_metadataClass, g_metadataConstructFp);
(*env)->SetIntField(env, metadataObj, g_metadataColtypeField, fields[i].type);
......@@ -457,21 +449,21 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
}
TAOS_FIELD *fields = taos_fetch_fields(result);
int num_fields = taos_num_fields(result);
if (num_fields == 0) {
jniError("jobj:%p, conn:%p, resultset:%p, fields size is %d", jobj, tscon, (void*)res, num_fields);
int32_t numOfFields = taos_num_fields(result);
if (numOfFields == 0) {
jniError("jobj:%p, conn:%p, resultset:%p, fields size %d", jobj, tscon, (void*)res, numOfFields);
return JNI_NUM_OF_FIELDS_0;
}
TAOS_ROW row = taos_fetch_row(result);
if (row == NULL) {
int tserrno = taos_errno(result);
if (tserrno == 0) {
jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void*)res, num_fields);
int code = taos_errno(result);
if (code == TSDB_CODE_SUCCESS) {
jniDebug("jobj:%p, conn:%p, resultset:%p, fields size is %d, fetch row to the end", jobj, tscon, (void*)res, numOfFields);
return JNI_FETCH_END;
} else {
jniDebug("jobj:%p, conn:%p, interruptted query", jobj, tscon);
jniDebug("jobj:%p, conn:%p, interrupted query", jobj, tscon);
return JNI_RESULT_SET_NULL;
}
}
......@@ -480,7 +472,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
char tmp[TSDB_MAX_BYTES_PER_ROW] = {0};
for (int i = 0; i < num_fields; i++) {
for (int i = 0; i < numOfFields; i++) {
if (row[i] == NULL) {
continue;
}
......@@ -534,6 +526,45 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp(JNIEn
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp(JNIEnv *env, jobject jobj, jlong con,
jlong res, jobject rowobj) {
TAOS * tscon = (TAOS *)con;
int32_t code = check_for_params(jobj, con, res);
if (code != JNI_SUCCESS) {
return code;
}
TAOS_RES * tres = (TAOS_RES *)res;
TAOS_FIELD *fields = taos_fetch_fields(tres);
int32_t numOfFields = taos_num_fields(tres);
assert(numOfFields > 0);
TAOS_ROW row = NULL;
int32_t numOfRows = taos_fetch_block(tres, &row);
if (numOfRows == 0) {
code = taos_errno(tres);
if (code == JNI_SUCCESS) {
jniDebug("jobj:%p, conn:%p, resultset:%p, numOfFields:%d, no data to retrieve", jobj, tscon, (void *)res,
numOfFields);
return JNI_FETCH_END;
} else {
jniDebug("jobj:%p, conn:%p, query interrupted", jobj, tscon);
return JNI_RESULT_SET_NULL;
}
}
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfRowsFp, (jint)numOfRows);
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetNumOfColsFp, (jint)numOfFields);
for (int i = 0; i < numOfFields; i++) {
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, i, fields[i].bytes * numOfRows,
jniFromNCharToByteArray(env, (char *)row[i], fields[i].bytes * numOfRows));
}
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionImp(JNIEnv *env, jobject jobj,
jlong con) {
TAOS *tscon = (TAOS *)con;
......@@ -589,7 +620,6 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEn
jniGetGlobalMethod(env);
TAOS_SUB *tsub = (TAOS_SUB *)sub;
TAOS_RES *res = taos_consume(tsub);
if (res == NULL) {
......@@ -621,16 +651,16 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTab
jsize len = (*env)->GetArrayLength(env, jsql);
char *dst = (char *)calloc(1, sizeof(char) * (len + 1));
(*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)dst);
char *str = (char *)calloc(1, sizeof(char) * (len + 1));
(*env)->GetByteArrayRegion(env, jsql, 0, len, (jbyte *)str);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
int code = taos_validate_sql(tscon, dst);
int code = taos_validate_sql(tscon, str);
jniDebug("jobj:%p, conn:%p, code is %d", jobj, tscon, code);
free(dst);
free(str);
return code;
}
......
......@@ -341,7 +341,7 @@ TAOS_ROW tscFetchRow(void *param) {
return NULL;
}
void* data = doSetResultRowData(pSql, true);
void* data = doSetResultRowData(pSql);
tscClearSqlOwner(pSql);
return data;
......
......@@ -1651,7 +1651,7 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
arithSup.pArithExpr = pSup->pArithExprInfo;
tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithemicInputSrc);
tExprTreeCalcTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithmeticInputSrc);
} else {
SSqlExpr* pExpr = pSup->pSqlExpr;
memcpy(pbuf + pOutput->num * offset, pExpr->offset * pOutput->num + pOutput->data, pExpr->resBytes * pOutput->num);
......
......@@ -1439,19 +1439,6 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
static int tscSetResultPointer(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
pRes->tsrow[i] = (unsigned char*)((char*) pRes->data + offset * pRes->numOfRows);
}
return 0;
}
/*
* this function can only be called once.
* by using pRes->rspType to denote its status
......@@ -1462,15 +1449,18 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->code = TSDB_CODE_SUCCESS;
if (pRes->rspType == 0) {
pRes->numOfRows = numOfRes;
pRes->row = 0;
pRes->rspType = 1;
tscSetResultPointer(pQueryInfo, pRes);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
tscSetResRawPtr(pRes, pQueryInfo);
} else {
tscResetForNextRetrieve(pRes);
}
......@@ -1514,10 +1504,11 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
}
pRes->code = tscDoLocalMerge(pSql);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtr(pRes, pQueryInfo);
}
pRes->row = 0;
......@@ -2197,7 +2188,16 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pCmd->command == TSDB_SQL_RETRIEVE) {
tscSetResRawPtr(pRes, pQueryInfo);
} else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) {
tscSetResRawPtr(pRes, pQueryInfo);
} else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
tscSetResRawPtr(pRes, pQueryInfo);
}
if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
......
......@@ -489,6 +489,27 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
return (int)((pQueryInfo->order.order == TSDB_ORDER_DESC) ? pRes->numOfRows : -pRes->numOfRows);
}
static bool needToFetchNewBlock(SSqlObj* pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE ||
pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE ||
pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
pCmd->command == TSDB_SQL_SERV_STATUS ||
pCmd->command == TSDB_SQL_CURRENT_DB ||
pCmd->command == TSDB_SQL_SERV_VERSION ||
pCmd->command == TSDB_SQL_CLI_VERSION ||
pCmd->command == TSDB_SQL_CURRENT_USER);
}
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) {
......@@ -509,77 +530,50 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// set the sql object owner
tscSetSqlOwner(pSql);
// current data set are exhausted, fetch more data from node
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE ||
pCmd->command == TSDB_SQL_SHOW_CREATE_DATABASE ||
pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_DESCRIBE_TABLE ||
pCmd->command == TSDB_SQL_SERV_STATUS ||
pCmd->command == TSDB_SQL_CURRENT_DB ||
pCmd->command == TSDB_SQL_SERV_VERSION ||
pCmd->command == TSDB_SQL_CLI_VERSION ||
pCmd->command == TSDB_SQL_CURRENT_USER )) {
// current data set are exhausted, fetch more result from node
if (pRes->row >= pRes->numOfRows && needToFetchNewBlock(pSql)) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
tsem_wait(&pSql->rspSem);
}
void* data = doSetResultRowData(pSql, true);
void* data = doSetResultRowData(pSql);
tscClearSqlOwner(pSql);
return data;
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
#if 0
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
int nRows = 0;
if (pSql == NULL || pSql->signature != pSql) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
*rows = NULL;
return 0;
}
// projection query on metric, pipeline retrieve data from vnode list,
// instead of two-stage mergednodeProcessMsgFromShell free qhandle
nRows = taos_fetch_block_impl(res, rows);
// current subclause is completed, try the next subclause
while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->cmd.command = pQueryInfo->command;
pCmd->clauseIndex++;
pRes->numOfTotal += pRes->numOfClauseTotal;
pRes->numOfClauseTotal = 0;
pRes->rspType = 0;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pSql->subState.numOfSub = 0;
tfree(pSql->pSubs);
if (pRes->qhandle == 0 ||
pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->command == TSDB_SQL_INSERT) {
return 0;
}
assert(pSql->fp == NULL);
tscResetForNextRetrieve(pRes);
tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
tscProcessSql(pSql);
// set the sql object owner
tscSetSqlOwner(pSql);
nRows = taos_fetch_block_impl(res, rows);
// current data set are exhausted, fetch more data from node
if (needToFetchNewBlock(pSql)) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
tsem_wait(&pSql->rspSem);
}
return nRows;
#endif
*rows = pRes->urow;
(*rows) = taos_fetch_row(res);
return ((*rows) != NULL)? 1:0;
tscClearSqlOwner(pSql);
return (int32_t) pRes->numOfRows;
}
int taos_select_db(TAOS *taos, const char *db) {
......@@ -600,7 +594,7 @@ int taos_select_db(TAOS *taos, const char *db) {
}
// send free message to vnode to free qhandle and corresponding resources in vnode
static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
static bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......@@ -795,6 +789,25 @@ void taos_stop_query(TAOS_RES *res) {
tscDebug("%p query is cancelled", res);
}
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) {
return true;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
return true;
}
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, col);
if (col < 0 || col >= tscNumOfFields(pQueryInfo) || row < 0 || row > pSql->res.numOfRows) {
return true;
}
return isNull(((char*) pSql->res.urow[col]) + row * pInfo->field.bytes, pInfo->field.type);
}
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
int len = 0;
for (int i = 0; i < num_fields; ++i) {
......
......@@ -2367,6 +2367,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);
pRes->data = pFilePage->data;
tscSetResRawPtr(pRes, pQueryInfo);
}
void tscBuildResFromSubqueries(SSqlObj *pSql) {
......@@ -2379,13 +2380,12 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
if (pRes->tsrow == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pRes->numOfCols = (int16_t) tscSqlExprNumOfExprs(pQueryInfo);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
pRes->numOfCols = (int16_t)numOfExprs;
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
pRes->length = calloc(numOfExprs, sizeof(int32_t));
pRes->tsrow = calloc(pRes->numOfCols, POINTER_BYTES);
pRes->urow = calloc(pRes->numOfCols, POINTER_BYTES);
pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -2405,7 +2405,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
}
}
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
static UNUSED_FUNC void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
......@@ -2429,7 +2429,7 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
}
}
char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
int32_t index = -1;
......@@ -2447,7 +2447,7 @@ char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
}
TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -2460,22 +2460,20 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscNumOfFields(pQueryInfo);
int32_t offset = 0;
for (int i = 0; i < size; ++i) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, offset);
TAOS_FIELD *pField = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
offset += pField->bytes;
int32_t type = pInfo->field.type;
int32_t bytes = pInfo->field.bytes;
// primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->interval.interval > 0) {
continue;
if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
} else {
pRes->tsrow[i] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
pRes->length[i] = varDataLen(pRes->urow[i]);
}
if (pRes->tsrow[i] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
transferNcharData(pSql, i, pField);
}
((char**) pRes->urow)[i] += bytes;
}
pRes->row++; // index increase one-step
......
......@@ -220,13 +220,16 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
}
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
size_t numOfOutput = tscNumOfFields(pQueryInfo);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return false;
}
if (numOfOutput == numOfExprs) {
if (tscIsProjectionQuery(pQueryInfo)) {
return false;
}
size_t numOfOutput = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i)->pArithExprInfo;
if (pExprInfo != NULL) {
......@@ -265,16 +268,20 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pRes->tsrow == NULL) {
int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
pRes->numOfCols = numOfOutput;
pRes->numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
pRes->tsrow = calloc(numOfOutput, POINTER_BYTES);
pRes->length = calloc(numOfOutput, sizeof(int32_t));
pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
pRes->tsrow = calloc(pRes->numOfCols, POINTER_BYTES);
pRes->urow = calloc(pRes->numOfCols, POINTER_BYTES);
pRes->length = calloc(pRes->numOfCols, sizeof(int32_t));
pRes->buffer = calloc(pRes->numOfCols, POINTER_BYTES);
// not enough memory
if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
if (pRes->tsrow == NULL || pRes->urow == NULL || pRes->length == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
tfree(pRes->tsrow);
tfree(pRes->urow);
tfree(pRes->length);
tfree(pRes->buffer);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code;
}
......@@ -283,6 +290,71 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS;
}
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
assert(pRes->numOfCols > 0);
int32_t offset = 0;
for (int32_t i = 0; i < pRes->numOfCols; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
pRes->urow[i] = pRes->data + offset * pRes->numOfRows;
pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes;
// generated the user-defined column result
if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
if (pInfo->pSqlExpr->param[1].nType == TSDB_DATA_TYPE_NULL) {
setNullN(pRes->urow[i], pInfo->field.type, pInfo->field.bytes, (int32_t) pRes->numOfRows);
} else {
if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR || pInfo->field.type == TSDB_DATA_TYPE_BINARY) {
assert(pInfo->pSqlExpr->param[1].nLen <= pInfo->field.bytes);
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(varDataVal(p), pInfo->pSqlExpr->param[1].pz, pInfo->pSqlExpr->param[1].nLen);
varDataSetLen(p, pInfo->pSqlExpr->param[1].nLen);
}
} else {
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(p, &pInfo->pSqlExpr->param[1].i64Key, pInfo->field.bytes);
}
}
}
} else if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
pRes->buffer[i] = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
// string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
char* p = pRes->urow[i];
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* dst = pRes->buffer[i] + k * pInfo->field.bytes;
if (isNull(p, TSDB_DATA_TYPE_NCHAR)) {
memcpy(dst, p, varDataTLen(p));
} else {
int32_t length = taosUcs4ToMbs(varDataVal(p), varDataLen(p), varDataVal(dst));
varDataSetLen(dst, length);
if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
}
p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}
}
}
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
if (pRes->buffer != NULL) { // free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfCols; i++) {
......@@ -297,6 +369,7 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
tfree(pRes->tsrow);
tfree(pRes->length);
tfree(pRes->buffer);
tfree(pRes->urow);
tfree(pRes->pGroupRec);
tfree(pRes->pColumnIndex);
......
......@@ -16,10 +16,10 @@ package com.taosdata.jdbc;
public class ColumnMetaData {
int colType = 0;
String colName = null;
int colSize = -1;
int colIndex = 0;
private int colType = 0;
private String colName = null;
private int colSize = -1;
private int colIndex = 0;
public int getColSize() {
return colSize;
......
......@@ -14,7 +14,6 @@
*****************************************************************************/
package com.taosdata.jdbc;
import java.io.*;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
......@@ -35,11 +34,10 @@ import java.util.*;
import java.util.concurrent.Executor;
public class TSDBConnection implements Connection {
protected Properties props = null;
private TSDBJNIConnector connector = null;
protected Properties props = null;
private String catalog = null;
private TSDBDatabaseMetaData dbMetaData = null;
......@@ -47,15 +45,21 @@ public class TSDBConnection implements Connection {
private Properties clientInfoProps = new Properties();
private int timeoutMilliseconds = 0;
private String tsCharSet = "";
private boolean batchFetch = false;
public TSDBConnection(Properties info, TSDBDatabaseMetaData meta) throws SQLException {
this.dbMetaData = meta;
connect(info.getProperty(TSDBDriver.PROPERTY_KEY_HOST),
Integer.parseInt(info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "0")),
info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME), info.getProperty(TSDBDriver.PROPERTY_KEY_USER),
info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME),
info.getProperty(TSDBDriver.PROPERTY_KEY_USER),
info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
String batchLoad = info.getProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD);
if (batchLoad != null) {
this.batchFetch = Boolean.parseBoolean(batchLoad);
}
}
private void connect(String host, int port, String dbName, String user, String password) throws SQLException {
......@@ -223,6 +227,14 @@ public class TSDBConnection implements Connection {
return this.prepareStatement(sql);
}
public Boolean getBatchFetch() {
return this.batchFetch;
}
public void setBatchFetch(Boolean batchFetch) {
this.batchFetch = batchFetch;
}
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
......
......@@ -86,6 +86,11 @@ public class TSDBDriver extends AbstractTaosDriver {
*/
public static final String PROPERTY_KEY_CHARSET = "charset";
/**
* fetch data from native function in a batch model
*/
public static final String PROPERTY_KEY_BATCH_LOAD = "batch";
private TSDBDatabaseMetaData dbMetaData = null;
static {
......@@ -172,26 +177,21 @@ public class TSDBDriver extends AbstractTaosDriver {
url = url.substring(0, index);
StringTokenizer queryParams = new StringTokenizer(paramString, "&");
while (queryParams.hasMoreElements()) {
String parameterValuePair = queryParams.nextToken();
int indexOfEqual = parameterValuePair.indexOf("=");
String parameter = null;
String value = null;
if (indexOfEqual != -1) {
parameter = parameterValuePair.substring(0, indexOfEqual);
if (indexOfEqual + 1 < parameterValuePair.length()) {
value = parameterValuePair.substring(indexOfEqual + 1);
}
}
if ((value != null && value.length() > 0) && (parameter != null && parameter.length() > 0)) {
urlProps.setProperty(parameter, value);
String oneToken = queryParams.nextToken();
String[] pair = oneToken.split("=");
if ((pair[0] != null && pair[0].trim().length() > 0) && (pair[1] != null && pair[1].trim().length() > 0)) {
urlProps.setProperty(pair[0].trim(), pair[1].trim());
}
}
}
// parse Product Name
String dbProductName = url.substring(0, beginningOfSlashes);
dbProductName = dbProductName.substring(dbProductName.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
// parse dbname
// parse database name
url = url.substring(beginningOfSlashes + 2);
int indexOfSlash = url.indexOf("/");
if (indexOfSlash != -1) {
......@@ -200,6 +200,7 @@ public class TSDBDriver extends AbstractTaosDriver {
}
url = url.substring(0, indexOfSlash);
}
// parse port
int indexOfColon = url.indexOf(":");
if (indexOfColon != -1) {
......@@ -208,9 +209,11 @@ public class TSDBDriver extends AbstractTaosDriver {
}
url = url.substring(0, indexOfColon);
}
if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
}
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty(TSDBDriver.PROPERTY_KEY_USER));
return urlProps;
}
......
......@@ -243,6 +243,11 @@ public class TSDBJNIConnector {
private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData);
public int fetchBlock(long resultSet, TSDBResultSetBlockData blockData) {
return this.fetchBlockImp(this.taos, resultSet, blockData);
}
private native int fetchBlockImp(long connection, long resultSet, TSDBResultSetBlockData blockData);
/**
* Execute close operation from C to release connection pointer by JNI
*
......
......@@ -47,10 +47,14 @@ public class TSDBResultSet implements ResultSet {
private List<ColumnMetaData> columnMetaDataList = new ArrayList<ColumnMetaData>();
private TSDBResultSetRowData rowData;
private TSDBResultSetBlockData blockData;
private boolean batchFetch = false;
private boolean lastWasNull = false;
private final int COLUMN_INDEX_START_VALUE = 1;
private int rowIndex = 0;
public TSDBJNIConnector getJniConnector() {
return jniConnector;
}
......@@ -67,6 +71,14 @@ public class TSDBResultSet implements ResultSet {
this.resultSetPointer = resultSetPointer;
}
public void setBatchFetch(boolean batchFetch) {
this.batchFetch = batchFetch;
}
public Boolean getBatchFetch() {
return this.batchFetch;
}
public List<ColumnMetaData> getColumnMetaDataList() {
return columnMetaDataList;
}
......@@ -94,8 +106,8 @@ public class TSDBResultSet implements ResultSet {
public TSDBResultSet() {
}
public TSDBResultSet(TSDBJNIConnector connecter, long resultSetPointer) throws SQLException {
this.jniConnector = connecter;
public TSDBResultSet(TSDBJNIConnector connector, long resultSetPointer) throws SQLException {
this.jniConnector = connector;
this.resultSetPointer = resultSetPointer;
int code = this.jniConnector.getSchemaMetaData(this.resultSetPointer, this.columnMetaDataList);
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
......@@ -107,6 +119,7 @@ public class TSDBResultSet implements ResultSet {
}
this.rowData = new TSDBResultSetRowData(this.columnMetaDataList.size());
this.blockData = new TSDBResultSetBlockData(this.columnMetaDataList, this.columnMetaDataList.size());
}
public <T> T unwrap(Class<T> iface) throws SQLException {
......@@ -118,21 +131,42 @@ public class TSDBResultSet implements ResultSet {
}
public boolean next() throws SQLException {
if (rowData != null) {
this.rowData.clear();
}
if (this.getBatchFetch()) {
if (this.blockData.forward()) {
return true;
}
int code = this.jniConnector.fetchBlock(this.resultSetPointer, this.blockData);
this.blockData.reset();
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_RESULT_SET_NULL));
} else if (code == TSDBConstants.JNI_NUM_OF_FIELDS_0) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_NUM_OF_FIELDS_0));
} else if (code == TSDBConstants.JNI_FETCH_END) {
return false;
}
int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData);
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_RESULT_SET_NULL));
} else if (code == TSDBConstants.JNI_NUM_OF_FIELDS_0) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_NUM_OF_FIELDS_0));
} else if (code == TSDBConstants.JNI_FETCH_END) {
return false;
} else {
return true;
} else {
if (rowData != null) {
this.rowData.clear();
}
int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData);
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_RESULT_SET_NULL));
} else if (code == TSDBConstants.JNI_NUM_OF_FIELDS_0) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_NUM_OF_FIELDS_0));
} else if (code == TSDBConstants.JNI_FETCH_END) {
return false;
} else {
return true;
}
}
}
......@@ -155,21 +189,30 @@ public class TSDBResultSet implements ResultSet {
String res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return this.blockData.getString(colIndex);
}
return res;
}
public boolean getBoolean(int columnIndex) throws SQLException {
boolean res = false;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
} else {
return this.blockData.getBoolean(colIndex);
}
return res;
}
......@@ -177,66 +220,91 @@ public class TSDBResultSet implements ResultSet {
byte res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return (byte) this.blockData.getInt(colIndex);
}
return res;
}
public short getShort(int columnIndex) throws SQLException {
short res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return (short) this.blockData.getInt(colIndex);
}
return res;
}
public int getInt(int columnIndex) throws SQLException {
int res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return this.blockData.getInt(colIndex);
}
return res;
}
public long getLong(int columnIndex) throws SQLException {
long res = 0l;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return this.blockData.getLong(colIndex);
}
return res;
}
public float getFloat(int columnIndex) throws SQLException {
float res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return (float) this.blockData.getDouble(colIndex);
}
return res;
}
public double getDouble(int columnIndex) throws SQLException {
double res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
}
return res;
} else {
return this.blockData.getDouble(colIndex);
}
return res;
}
/*
......@@ -249,25 +317,11 @@ public class TSDBResultSet implements ResultSet {
*/
@Deprecated
public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
BigDecimal res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
}
return res;
return new BigDecimal(getLong(columnIndex));
}
public byte[] getBytes(int columnIndex) throws SQLException {
byte[] res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes();
}
return res;
return getString(columnIndex).getBytes();
}
public Date getDate(int columnIndex) throws SQLException {
......@@ -284,11 +338,15 @@ public class TSDBResultSet implements ResultSet {
Timestamp res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getTimestamp(colIndex);
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getTimestamp(colIndex);
}
return res;
} else {
return this.blockData.getTimestamp(columnIndex);
}
return res;
}
public InputStream getAsciiStream(int columnIndex) throws SQLException {
......@@ -400,8 +458,12 @@ public class TSDBResultSet implements ResultSet {
public Object getObject(int columnIndex) throws SQLException {
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
return this.rowData.get(colIndex);
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
return this.rowData.get(colIndex);
} else {
return this.blockData.get(colIndex);
}
}
public Object getObject(String columnLabel) throws SQLException {
......@@ -433,8 +495,12 @@ public class TSDBResultSet implements ResultSet {
public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
return new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
return new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
} else {
return new BigDecimal(this.blockData.getLong(colIndex));
}
}
public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
......
/***************************************************************************
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*****************************************************************************/
package com.taosdata.jdbc;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.DoubleBuffer;
import java.nio.FloatBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
import java.nio.ShortBuffer;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class TSDBResultSetBlockData {
private int numOfRows = 0;
private int numOfCols = 0;
private int rowIndex = 0;
private List<ColumnMetaData> columnMetaDataList;
private ArrayList<Object> colData = null;
public TSDBResultSetBlockData(List<ColumnMetaData> colMeta, int numOfCols) {
this.columnMetaDataList = colMeta;
this.setNumOfCols(numOfCols);
}
public TSDBResultSetBlockData() {
this.colData = new ArrayList<Object>();
this.setNumOfCols(0);
}
public void clear() {
if (this.colData != null) {
this.colData.clear();
}
if (this.numOfCols == 0) {
return;
}
}
public int getNumOfRows() {
return this.numOfRows;
}
public void setNumOfRows(int numOfRows) {
this.numOfRows = numOfRows;
}
public int getNumOfCols() {
return numOfCols;
}
public void setNumOfCols(int numOfCols) {
this.numOfCols = numOfCols;
this.clear();
}
public boolean hasMore() {
return this.rowIndex < this.numOfRows;
}
public boolean forward() {
if (this.rowIndex > this.numOfRows) {
return false;
}
return ((++this.rowIndex) < this.numOfRows);
}
public void reset() {
this.rowIndex = 0;
}
public void setBoolean(int col, boolean value) {
colData.set(col, value);
}
public void setByteArray(int col, int length, byte[] value) {
try {
switch (this.columnMetaDataList.get(col).getColType()) {
case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
buf.order(ByteOrder.LITTLE_ENDIAN).asCharBuffer();
this.colData.set(col, buf);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TINYINT: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
buf.order(ByteOrder.LITTLE_ENDIAN);
this.colData.set(col, buf);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
ShortBuffer sb = buf.order(ByteOrder.LITTLE_ENDIAN).asShortBuffer();
this.colData.set(col, sb);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_INT: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
IntBuffer ib = buf.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
this.colData.set(col, ib);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
LongBuffer lb = buf.order(ByteOrder.LITTLE_ENDIAN).asLongBuffer();
this.colData.set(col, lb);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
FloatBuffer fb = buf.order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer();
this.colData.set(col, fb);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
DoubleBuffer db = buf.order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer();
this.colData.set(col, db);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
buf.order(ByteOrder.LITTLE_ENDIAN);
this.colData.set(col, buf);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
LongBuffer lb = buf.order(ByteOrder.LITTLE_ENDIAN).asLongBuffer();
this.colData.set(col, lb);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR: {
ByteBuffer buf = ByteBuffer.wrap(value, 0, length);
buf.order(ByteOrder.LITTLE_ENDIAN);
this.colData.set(col, buf);
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static class NullType {
private static final byte NULL_BOOL_VAL = 0x2;
private static final String NULL_STR = "null";
public String toString() {
return NullType.NULL_STR;
}
public static boolean isBooleanNull(byte val) {
return val == NullType.NULL_BOOL_VAL;
}
private static boolean isTinyIntNull(byte val) {
return val == Byte.MIN_VALUE;
}
private static boolean isSmallIntNull(short val) {
return val == Short.MIN_VALUE;
}
private static boolean isIntNull(int val) {
return val == Integer.MIN_VALUE;
}
private static boolean isBigIntNull(long val) {
return val == Long.MIN_VALUE;
}
private static boolean isFloatNull(float val) {
return Float.isNaN(val);
}
private static boolean isDoubleNull(double val) {
return Double.isNaN(val);
}
private static boolean isBinaryNull(byte[] val, int length) {
if (length != Byte.BYTES) {
return false;
}
return val[0] == 0xFF;
}
private static boolean isNcharNull(byte[] val, int length) {
if (length != Integer.BYTES) {
return false;
}
return (val[0] & val[1] & val[2] & val[3]) == 0xFF;
}
}
/**
* The original type may not be a string type, but will be converted to by
* calling this method
*
* @param col column index
* @return
* @throws SQLException
*/
public String getString(int col) throws SQLException {
Object obj = get(col);
if (obj == null) {
return new NullType().toString();
}
return obj.toString();
}
public int getInt(int col) {
Object obj = get(col);
if (obj == null) {
return 0;
}
int type = this.columnMetaDataList.get(col).getColType();
switch (type) {
case TSDBConstants.TSDB_DATA_TYPE_BOOL:
case TSDBConstants.TSDB_DATA_TYPE_TINYINT:
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_INT: {
return (int) obj;
}
case TSDBConstants.TSDB_DATA_TYPE_BIGINT:
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: {
return ((Long) obj).intValue();
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT:
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
return ((Double) obj).intValue();
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
return Integer.parseInt((String) obj);
}
}
return 0;
}
public boolean getBoolean(int col) throws SQLException {
Object obj = get(col);
if (obj == null) {
return Boolean.FALSE;
}
int type = this.columnMetaDataList.get(col).getColType();
switch (type) {
case TSDBConstants.TSDB_DATA_TYPE_BOOL:
case TSDBConstants.TSDB_DATA_TYPE_TINYINT:
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_INT: {
return ((int) obj == 0L) ? Boolean.FALSE : Boolean.TRUE;
}
case TSDBConstants.TSDB_DATA_TYPE_BIGINT:
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: {
return (((Long) obj) == 0L) ? Boolean.FALSE : Boolean.TRUE;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT:
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
return (((Double) obj) == 0) ? Boolean.FALSE : Boolean.TRUE;
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
if ("TRUE".compareToIgnoreCase((String) obj) == 0) {
return Boolean.TRUE;
} else if ("FALSE".compareToIgnoreCase((String) obj) == 0) {
return Boolean.TRUE;
} else {
throw new SQLDataException();
}
}
}
return Boolean.FALSE;
}
public long getLong(int col) throws SQLException {
Object obj = get(col);
if (obj == null) {
return 0;
}
int type = this.columnMetaDataList.get(col).getColType();
switch (type) {
case TSDBConstants.TSDB_DATA_TYPE_BOOL:
case TSDBConstants.TSDB_DATA_TYPE_TINYINT:
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_INT: {
return (int) obj;
}
case TSDBConstants.TSDB_DATA_TYPE_BIGINT:
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: {
return (long) obj;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT:
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
return ((Double) obj).longValue();
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
return Long.parseLong((String) obj);
}
}
return 0;
}
public Timestamp getTimestamp(int col) {
try {
return new Timestamp(getLong(col));
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
public double getDouble(int col) {
Object obj = get(col);
if (obj == null) {
return 0;
}
int type = this.columnMetaDataList.get(col).getColType();
switch (type) {
case TSDBConstants.TSDB_DATA_TYPE_BOOL:
case TSDBConstants.TSDB_DATA_TYPE_TINYINT:
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_INT: {
return (int) obj;
}
case TSDBConstants.TSDB_DATA_TYPE_BIGINT:
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: {
return (long) obj;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT:
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
return (double) obj;
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
return Double.parseDouble((String) obj);
}
}
return 0;
}
public Object get(int col) {
int fieldSize = this.columnMetaDataList.get(col).getColSize();
switch (this.columnMetaDataList.get(col).getColType()) {
case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
byte val = bb.get(this.rowIndex);
if (NullType.isBooleanNull(val)) {
return null;
}
return (val == 0x0) ? Boolean.FALSE : Boolean.TRUE;
}
case TSDBConstants.TSDB_DATA_TYPE_TINYINT: {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
byte val = bb.get(this.rowIndex);
if (NullType.isTinyIntNull(val)) {
return null;
}
return val;
}
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
ShortBuffer sb = (ShortBuffer) this.colData.get(col);
short val = sb.get(this.rowIndex);
if (NullType.isSmallIntNull(val)) {
return null;
}
return val;
}
case TSDBConstants.TSDB_DATA_TYPE_INT: {
IntBuffer ib = (IntBuffer) this.colData.get(col);
int val = ib.get(this.rowIndex);
if (NullType.isIntNull(val)) {
return null;
}
return val;
}
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
LongBuffer lb = (LongBuffer) this.colData.get(col);
long val = lb.get(this.rowIndex);
if (NullType.isBigIntNull(val)) {
return null;
}
return (long) val;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
FloatBuffer fb = (FloatBuffer) this.colData.get(col);
float val = fb.get(this.rowIndex);
if (NullType.isFloatNull(val)) {
return null;
}
return val;
}
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
DoubleBuffer lb = (DoubleBuffer) this.colData.get(col);
double val = lb.get(this.rowIndex);
if (NullType.isDoubleNull(val)) {
return null;
}
return val;
}
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
bb.position(fieldSize * this.rowIndex);
int length = bb.getShort();
byte[] dest = new byte[length];
bb.get(dest, 0, length);
if (NullType.isBinaryNull(dest, length)) {
return null;
}
return new String(dest);
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR: {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
bb.position(fieldSize * this.rowIndex);
int length = bb.getShort();
byte[] dest = new byte[length];
bb.get(dest, 0, length);
if (NullType.isNcharNull(dest, length)) {
return null;
}
try {
String ss = TaosGlobalConfig.getCharset();
return new String(dest, ss);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return 0;
}
}
......@@ -218,5 +218,4 @@ public class TSDBResultSetRowData {
public void setData(ArrayList<Object> data) {
this.data = (ArrayList<Object>) data.clone();
}
}
......@@ -19,7 +19,7 @@ import java.util.ArrayList;
import java.util.List;
public class TSDBStatement implements Statement {
private TSDBJNIConnector connecter = null;
private TSDBJNIConnector connector = null;
/**
* To store batched commands
......@@ -45,9 +45,9 @@ public class TSDBStatement implements Statement {
this.connection = connection;
}
TSDBStatement(TSDBConnection connection, TSDBJNIConnector connecter) {
TSDBStatement(TSDBConnection connection, TSDBJNIConnector connector) {
this.connection = connection;
this.connecter = connecter;
this.connector = connector;
this.isClosed = false;
}
......@@ -65,25 +65,27 @@ public class TSDBStatement implements Statement {
}
// TODO make sure it is not a update query
pSql = this.connecter.executeQuery(sql);
pSql = this.connector.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
long resultSetPointer = this.connector.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
// create/insert/update/delete/alter
if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
return null;
}
if (!this.connecter.isUpdateQuery(pSql)) {
return new TSDBResultSet(this.connecter, resultSetPointer);
if (!this.connector.isUpdateQuery(pSql)) {
TSDBResultSet res = new TSDBResultSet(this.connector, resultSetPointer);
res.setBatchFetch(this.connection.getBatchFetch());
return res;
} else {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
return null;
}
......@@ -95,28 +97,28 @@ public class TSDBStatement implements Statement {
}
// TODO check if current query is update query
pSql = this.connecter.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
pSql = this.connector.executeQuery(sql);
long resultSetPointer = this.connector.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
this.affectedRows = this.connecter.getAffectedRows(pSql);
this.connecter.freeResultSet(pSql);
this.affectedRows = this.connector.getAffectedRows(pSql);
this.connector.freeResultSet(pSql);
return this.affectedRows;
}
public String getErrorMsg(long pSql) {
return this.connecter.getErrMsg(pSql);
return this.connector.getErrMsg(pSql);
}
public void close() throws SQLException {
if (!isClosed) {
if (!this.connecter.isResultsetClosed()) {
this.connecter.freeResultSet();
if (!this.connector.isResultsetClosed()) {
this.connector.freeResultSet();
}
isClosed = true;
}
......@@ -136,7 +138,7 @@ public class TSDBStatement implements Statement {
}
public void setMaxRows(int max) throws SQLException {
// always set maxRows to zero, meaning unlimitted rows in a resultSet
// always set maxRows to zero, meaning unlimited rows in a resultSet
}
public void setEscapeProcessing(boolean enable) throws SQLException {
......@@ -172,15 +174,15 @@ public class TSDBStatement implements Statement {
throw new SQLException("Invalid method call on a closed statement.");
}
boolean res = true;
pSql = this.connecter.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
pSql = this.connector.executeQuery(sql);
long resultSetPointer = this.connector.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
// no result set is retrieved
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
res = false;
}
......@@ -191,10 +193,10 @@ public class TSDBStatement implements Statement {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
long resultSetPointer = connecter.getResultSet();
long resultSetPointer = connector.getResultSet();
TSDBResultSet resSet = null;
if (resultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
resSet = new TSDBResultSet(connecter, resultSetPointer);
resSet = new TSDBResultSet(connector, resultSetPointer);
}
return resSet;
}
......@@ -267,7 +269,7 @@ public class TSDBStatement implements Statement {
}
public Connection getConnection() throws SQLException {
if (this.connecter != null)
if (this.connector != null)
return this.connection;
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
......
......@@ -18,7 +18,7 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1]))
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::1]))
else:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]))
......@@ -26,7 +26,7 @@ def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
......@@ -34,7 +34,7 @@ def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
......@@ -42,7 +42,7 @@ def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]]
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::1]]
else:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
......@@ -50,7 +50,7 @@ def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
......@@ -58,7 +58,7 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)] ]
......@@ -66,7 +66,7 @@ def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ]
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
......@@ -74,7 +74,7 @@ def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ]
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
......@@ -82,7 +82,7 @@ def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
if num_of_rows > 0:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]]
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::1]]
else:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
......@@ -311,29 +311,24 @@ class CTaosInterface(object):
@staticmethod
def fetchBlock(result, fields):
pblock = ctypes.c_void_p(0)
num_of_rows = CTaosInterface.libtaos.taos_fetch_block(
result, ctypes.byref(pblock))
if num_of_rows == 0:
pblock = CTaosInterface.libtaos.taos_fetch_row(result)
if pblock :
num_of_rows = 1
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
if data is None:
blocks[i] = [None]
else:
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
else:
return None, 0
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
continue
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
return blocks, abs(num_of_rows)
@staticmethod
def freeResult(result):
CTaosInterface.libtaos.taos_free_result(result)
......
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
import threading
class TDengineCursor(object):
......@@ -35,6 +36,7 @@ class TDengineCursor(object):
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None:
self._connection = connection
......@@ -42,7 +44,7 @@ class TDengineCursor(object):
def __iter__(self):
return self
def next(self):
def __next__(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
......@@ -137,7 +139,7 @@ class TDengineCursor(object):
else:
raise ProgrammingError(
CTaosInterface.errStr(
self._result ), errno)
self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
......@@ -148,6 +150,8 @@ class TDengineCursor(object):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def istype(self, col, dataType):
if (dataType.upper() == "BOOL"):
......@@ -180,9 +184,6 @@ class TDengineCursor(object):
return False
def fetchmany(self):
pass
def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
......@@ -201,8 +202,6 @@ class TDengineCursor(object):
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
......
......@@ -18,7 +18,7 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1]))
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::1]))
else:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]))
......@@ -26,7 +26,7 @@ def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
......@@ -34,7 +34,7 @@ def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
......@@ -42,7 +42,7 @@ def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]]
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::1]]
else:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
......@@ -50,7 +50,7 @@ def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
......@@ -58,7 +58,7 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)] ]
......@@ -66,7 +66,7 @@ def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ]
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
......@@ -74,7 +74,7 @@ def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ]
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
......@@ -82,7 +82,7 @@ def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
if num_of_rows > 0:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]]
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::1]]
else:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
......@@ -111,7 +111,7 @@ def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
# except ValueError:
# res.append(None)
# return res
# # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)][::-1]]
# # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)][::1]]
# else:
# return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)]]
......@@ -308,32 +308,48 @@ class CTaosInterface(object):
return fields
# @staticmethod
# def fetchBlock(result, fields):
# pblock = ctypes.c_void_p(0)
# num_of_rows = CTaosInterface.libtaos.taos_fetch_block(
# result, ctypes.byref(pblock))
# if num_of_rows == 0:
# return None, 0
# isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
# blocks = [None] * len(fields)
# fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
# fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
# for i in range(len(fields)):
# data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
# if fields[i]['type'] not in _CONVERT_FUNC:
# raise DatabaseError("Invalid data type returned from database")
# print('====================',fieldLen[i])
# blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
# return blocks, abs(num_of_rows)
@staticmethod
def fetchBlock(result, fields):
pblock = ctypes.c_void_p(0)
num_of_rows = CTaosInterface.libtaos.taos_fetch_block(
result, ctypes.byref(pblock))
if num_of_rows == 0:
pblock = CTaosInterface.libtaos.taos_fetch_row(result)
if pblock :
num_of_rows = 1
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
if data is None:
blocks[i] = [None]
else:
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
else:
return None, 0
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
continue
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
return blocks, abs(num_of_rows)
@staticmethod
def freeResult(result):
CTaosInterface.libtaos.taos_free_result(result)
......
......@@ -216,7 +216,6 @@ class TDengineCursor(object):
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
......
......@@ -18,7 +18,7 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1]))
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::1]))
else:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
......@@ -26,7 +26,7 @@ def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
......@@ -34,7 +34,7 @@ def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
......@@ -42,7 +42,7 @@ def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]]
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::1]]
else:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
......@@ -50,7 +50,7 @@ def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
......@@ -58,7 +58,7 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ]
......@@ -66,7 +66,7 @@ def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ]
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
......@@ -74,7 +74,7 @@ def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ]
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
......@@ -82,7 +82,7 @@ def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
if num_of_rows > 0:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]]
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::1]]
else:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
......@@ -310,27 +310,23 @@ class CTaosInterface(object):
@staticmethod
def fetchBlock(result, fields):
pblock = ctypes.c_void_p(0)
num_of_rows = CTaosInterface.libtaos.taos_fetch_block(
result, ctypes.byref(pblock))
if num_of_rows == 0:
pblock = CTaosInterface.libtaos.taos_fetch_row(result)
if pblock :
num_of_rows = 1
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
if data is None:
blocks[i] = [None]
else:
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
else:
return None, 0
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
continue
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
return blocks, abs(num_of_rows)
@staticmethod
......
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
import threading
# querySeqNum = 0
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
......@@ -32,6 +36,8 @@ class TDengineCursor(object):
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None:
self._connection = connection
......@@ -39,7 +45,7 @@ class TDengineCursor(object):
def __iter__(self):
return self
def next(self):
def __next__(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
......
......@@ -18,7 +18,7 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1]))
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::1]))
else:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
......@@ -26,7 +26,7 @@ def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
......@@ -34,7 +34,7 @@ def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
......@@ -42,7 +42,7 @@ def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]]
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::1]]
else:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
......@@ -50,7 +50,7 @@ def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
......@@ -58,7 +58,7 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1] ]
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::1] ]
else:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ]
......@@ -66,7 +66,7 @@ def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ]
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
......@@ -74,7 +74,7 @@ def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ]
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::1] ]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
......@@ -82,7 +82,7 @@ def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
if num_of_rows > 0:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]]
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::1]]
else:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
......@@ -225,6 +225,7 @@ class CTaosInterface(object):
if connection.value == None:
print('connect to TDengine failed')
raise ConnectionError("connect to TDengine failed")
# sys.exit(1)
else:
print('connect to TDengine success')
......@@ -310,27 +311,23 @@ class CTaosInterface(object):
@staticmethod
def fetchBlock(result, fields):
pblock = ctypes.c_void_p(0)
num_of_rows = CTaosInterface.libtaos.taos_fetch_block(
result, ctypes.byref(pblock))
if num_of_rows == 0:
pblock = CTaosInterface.libtaos.taos_fetch_row(result)
if pblock :
num_of_rows = 1
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
if data is None:
blocks[i] = [None]
else:
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
else:
return None, 0
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
continue
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
return blocks, abs(num_of_rows)
@staticmethod
......
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
import threading
# querySeqNum = 0
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
......@@ -32,6 +37,8 @@ class TDengineCursor(object):
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None:
self._connection = connection
......
......@@ -17,6 +17,7 @@
#define TDENGINE_TAOS_H
#include <stdint.h>
#include <stdbool.h>
#ifdef __cplusplus
extern "C" {
......@@ -109,13 +110,14 @@ DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *tres);
DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
int taos_validate_sql(TAOS *taos, const char *sql);
......
......@@ -847,35 +847,50 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
}
int32_t startPos = 0;
// tumbling time window query, a special case of sliding time window query
if (pQuery->interval.sliding == pQuery->interval.interval && prevPosition != -1) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
startPos = prevPosition + factor;
} else {
startPos = searchFn((char *)primaryKeys, pDataBlockInfo->rows, startKey, pQuery->order.order);
if (startKey < pDataBlockInfo->window.skey && QUERY_IS_ASC_QUERY(pQuery)) {
startPos = 0;
} else if (startKey > pDataBlockInfo->window.ekey && !QUERY_IS_ASC_QUERY(pQuery)) {
startPos = pDataBlockInfo->rows - 1;
} else {
startPos = searchFn((char *)primaryKeys, pDataBlockInfo->rows, startKey, pQuery->order.order);
}
}
/*
* This time window does not cover any data, try next time window,
* this case may happen when the time window is too small
*/
if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNext->ekey) {
TSKEY next = primaryKeys[startPos];
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, &pQuery->interval, pQuery->precision);
pNext->ekey = taosTimeAdd(pNext->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
if (primaryKeys == NULL) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
assert(pDataBlockInfo->window.skey <= pNext->ekey);
} else {
pNext->ekey += ((next - pNext->ekey + pQuery->interval.sliding - 1)/pQuery->interval.sliding) * pQuery->interval.sliding;
pNext->skey = pNext->ekey - pQuery->interval.interval + 1;
assert(pDataBlockInfo->window.ekey >= pNext->skey);
}
} else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNext->skey) {
TSKEY next = primaryKeys[startPos];
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, &pQuery->interval, pQuery->precision);
pNext->ekey = taosTimeAdd(pNext->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
} else {
pNext->skey -= ((pNext->skey - next + pQuery->interval.sliding - 1) / pQuery->interval.sliding) * pQuery->interval.sliding;
pNext->ekey = pNext->skey + pQuery->interval.interval - 1;
} else {
if (QUERY_IS_ASC_QUERY(pQuery) && primaryKeys[startPos] > pNext->ekey) {
TSKEY next = primaryKeys[startPos];
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, &pQuery->interval, pQuery->precision);
pNext->ekey = taosTimeAdd(pNext->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
} else {
pNext->ekey += ((next - pNext->ekey + pQuery->interval.sliding - 1)/pQuery->interval.sliding) * pQuery->interval.sliding;
pNext->skey = pNext->ekey - pQuery->interval.interval + 1;
}
} else if ((!QUERY_IS_ASC_QUERY(pQuery)) && primaryKeys[startPos] < pNext->skey) {
TSKEY next = primaryKeys[startPos];
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
pNext->skey = taosTimeTruncate(next, &pQuery->interval, pQuery->precision);
pNext->ekey = taosTimeAdd(pNext->skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
} else {
pNext->skey -= ((pNext->skey - next + pQuery->interval.sliding - 1) / pQuery->interval.sliding) * pQuery->interval.sliding;
pNext->ekey = pNext->skey + pQuery->interval.interval - 1;
}
}
}
......
Subproject commit f2ffd30521b8e8afbc9d25c75f8eeeb6a48bd030
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 3
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = m_alt_db
......@@ -56,6 +56,7 @@ if $rows != 2 then
endi
print data03 = $data03
if $data03 != taos then
print expect taos, actual: $data03
return -1
endi
sql drop table tb
......@@ -113,7 +114,7 @@ endi
sql drop table tb
sql drop table mt
sleep 3000
sleep 1000
### ALTER TABLE WHILE STREAMING [TBASE271]
#sql create table tb1 (ts timestamp, c1 int, c2 nchar(5), c3 int)
#sql create table strm as select count(*), avg(c1), first(c2), sum(c3) from tb1 interval(2s)
......@@ -146,7 +147,7 @@ sleep 3000
#sql alter table tb1 add column c3 int
#sleep 6000
#sql insert into tb1 values (now, 3, 'taos', 3);
#sleep 3000
#sleep 1000
#sql select * from strm
#if $rows != 3 then
# return -1
......@@ -185,7 +186,7 @@ sql create database $db
sql use $db
sql create table mt (ts timestamp, c1 int, c2 nchar(7), c3 int) tags (t1 int)
sql create table tb using mt tags(1)
sleep 3000
sleep 1000
sql insert into tb values ('2018-11-01 16:30:00.000', 1, 'insert', 1)
sql alter table mt drop column c3
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
sql reset query cache
......@@ -87,7 +87,7 @@ if $data13 != NULL then
return -1
endi
sleep 3000
sleep 1000
print ================== insert values into table
sql insert into car1 values (now, 1, 1,1 ) (now +1s, 2,2,2,) car2 values (now, 1,3,3)
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 3
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ========== alter_stable.sim
......
......@@ -4,7 +4,7 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ======================== dnode1 start
......@@ -212,7 +212,7 @@ sleep 5000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
sql use $db
#### auto create multiple tables
......
......@@ -4,7 +4,7 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 4
system sh/cfg.sh -n dnode1 -c ctime -v 30
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 3
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
sql drop database if exists ecdb
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = ca_db
......
......@@ -5,7 +5,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 3
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ========== columnValues.sim
......
sleep 3000
sleep 1000
sql connect
sql create database if not exists db
sql use db
......
sleep 3000
sleep 1000
sql connect
sql create database if not exists db
sql use db
......
####
sleep 3000
sleep 1000
sql connect
sql create database if not exists db
sql use db
......
####
sleep 3000
sleep 1000
sql connect
sql create database if not exists db
sql use db
......
sleep 3000
sleep 1000
sql connect
sql create database if not exists db
sql use db
......
sleep 3000
sleep 1000
sql connect
sql create database if not exists db
sql use db
......
sleep 3000
sleep 1000
sql connect
sql create database if not exists db
sql use db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxTablesperVnode -v 100
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = sc_db
......@@ -85,10 +85,10 @@ print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 5000
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
print ====== select from table and check num of rows returned
sql use $db
......
......@@ -5,7 +5,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ======================== dnode1 start
......
......@@ -5,7 +5,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ======================== dnode1 start
......
......@@ -5,7 +5,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ======================== dnode1 start
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = m_fl_db
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = fl1_db
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = m_fl_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxTablespervnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = first_db
......@@ -80,7 +80,7 @@ sleep 5000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
run general/parser/first_last_query.sim
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = first_db
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = impt_db
......@@ -64,7 +64,7 @@ sleep 2000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
sql use $db
sql select * from tb
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c ctime -v 30
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = ic_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c ctime -v 30
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = ic_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c ctime -v 30
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = ic_db
......
......@@ -6,7 +6,7 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
sleep 3000
sleep 1000
print ======================== dnode1 start
sql create database mul_db
......
......@@ -5,7 +5,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ======================== dnode1 start
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = intp_db
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = intp_db
......@@ -303,6 +303,8 @@ $tb = $tbPrefix . 0
return -1
endi
print select interp(ts), interp(c1), interp(c2), interp(c3), interp(c4), interp(c5), interp(c6), interp(c7), interp(c8), interp(c9) from $tb where ts = $ts0 fill(linear)
sql select interp(ts), interp(c1), interp(c2), interp(c3), interp(c4), interp(c5), interp(c6), interp(c7), interp(c8), interp(c9) from $tb where ts = $ts0 fill(linear)
if $rows != 1 then
return -1
......@@ -338,6 +340,8 @@ $tb = $tbPrefix . 0
return -1
endi
# columns contain NULL values
print select interp(ts), interp(c1), interp(c2), interp(c3), interp(c4), interp(c5), interp(c6), interp(c7), interp(c8), interp(c9) from intp_tb3 where ts = $ts0 fill(linear)
sql select interp(ts), interp(c1), interp(c2), interp(c3), interp(c4), interp(c5), interp(c6), interp(c7), interp(c8), interp(c9) from intp_tb3 where ts = $ts0 fill(linear)
if $rows != 1 then
return -1
......@@ -380,6 +384,7 @@ $tb = $tbPrefix . 0
endi
$t = $tsu + 1000
print select interp(ts), interp(c1), interp(c2), interp(c3), interp(c4), interp(c5), interp(c6), interp(c7), interp(c8), interp(c9) from $tb where ts = $t fill(linear)
sql select interp(ts), interp(c1), interp(c2), interp(c3), interp(c4), interp(c5), interp(c6), interp(c7), interp(c8), interp(c9) from $tb where ts = $t fill(linear)
if $rows != 0 then
return -1
......@@ -387,6 +392,7 @@ $tb = $tbPrefix . 0
## fill(value)
$t = $ts0 + 1000
print 91
sql select interp(ts), interp(c1), interp(c2), interp(c3), interp(c4), interp(c5), interp(c6), interp(c7), interp(c8), interp(c9) from $tb where ts = $t fill(value, -1, -2)
if $rows != 1 then
return -1
......@@ -456,6 +462,7 @@ $tb = $tbPrefix . 0
if $data09 != nchar0 then
return -1
endi
# table has NULL columns
sql select interp(ts), interp(c1), interp(c2), interp(c3), interp(c4), interp(c5), interp(c6), interp(c7), interp(c8), interp(c9) from intp_tb3 where ts = $ts0 fill(value, -1, -2, -3)
if $rows != 1 then
......@@ -491,11 +498,14 @@ $tb = $tbPrefix . 0
##### select interp from stable
## interp(*) from stb
print select interp(*) from $stb where ts = $ts0
sql select interp(*) from $stb where ts = $ts0
if $rows != 1 then
return -1
endi
$t = $ts0 + 1000
print 92
sql select interp(*) from $stb where ts = $t
if $rows != 0 then
return -1
......
......@@ -360,9 +360,7 @@ endi
sql select join_mt1.* from join_mt1
print $rows
$val = 2000
if $rows != $val then
if $rows != 2000 then
return -1
endi
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = lr_db
......@@ -66,7 +66,7 @@ sleep 5000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
run general/parser/lastrow_query.sim
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = lr_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 1
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm_db
......@@ -66,7 +66,7 @@ sleep 5000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
run general/parser/limit_tb.sim
run general/parser/limit_stb.sim
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 1
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm1_db
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm1_db
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm1_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 1
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm1_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c rowsInFileBlock -v 255
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm2_db
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm2_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c rowsInFileBlock -v 255
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm2_db
......@@ -69,7 +69,7 @@ print ====== tables created
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 3000
sleep 1000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm_db
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = lm_db
......
......@@ -5,7 +5,7 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = mb_db
......
......@@ -5,7 +5,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ======================== dnode1 start
......
......@@ -6,7 +6,7 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c tableMetaKeepTimer -v 3
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ========== NULL_char.sim
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = group_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 200
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = sc_db
......@@ -122,9 +122,9 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 6000
system sh/exec.sh -n dnode1 -s start
print ====== server restart completed
sleep 3000
sleep 1000
sql connect
sleep 3000
sleep 1000
sql use $db
##### repeat test after server restart
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 5
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = sav_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = scd_db
......@@ -39,7 +39,7 @@ sleep 5000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
sql use $db
# generate some data in cache
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 1
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = sr_db
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = sr_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = slm_db
......@@ -101,7 +101,7 @@ sleep 5000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
run general/parser/slimit_query.sim
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = slm_alt_tg_db
......@@ -60,7 +60,7 @@ sleep 5000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
run general/parser/slimit1_query.sim
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = slm_alt_tg_db
......
......@@ -4,7 +4,7 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = slm_alt_tg_db
......@@ -175,7 +175,7 @@ sleep 5000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
sql use $db
### repeat above queries
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = slm_db
......
......@@ -6,7 +6,7 @@ system sh/cfg.sh -n dnode1 -c monitor -v 1
system sh/cfg.sh -n dnode1 -c monitorInterval -v 1
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
print ======================== stream_on_sys.sim
......
......@@ -3,9 +3,9 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
sleep 3000
sleep 1000
$db = dytag_db
$tbNum = 10
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$db = tf_db
......
......@@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = ti_db
......
sleep 3000
sleep 1000
sql connect
$dbPrefix = ti_db
......
sleep 2000
sleep 1000
run general/parser/alter.sim
sleep 2000
sleep 1000
run general/parser/alter1.sim
sleep 2000
sleep 1000
run general/parser/alter_stable.sim
sleep 2000
sleep 1000
run general/parser/auto_create_tb.sim
sleep 2000
sleep 1000
run general/parser/auto_create_tb_drop_tb.sim
sleep 2000
sleep 1000
run general/parser/col_arithmetic_operation.sim
sleep 2000
sleep 1000
run general/parser/columnValue.sim
sleep 2000
sleep 1000
run general/parser/commit.sim
sleep 2000
sleep 1000
run general/parser/create_db.sim
sleep 2000
sleep 1000
run general/parser/create_mt.sim
sleep 2000
sleep 1000
run general/parser/create_tb.sim
sleep 2000
sleep 1000
run general/parser/dbtbnameValidate.sim
sleep 2000
sleep 1000
run general/parser/fill.sim
sleep 2000
sleep 1000
run general/parser/fill_stb.sim
sleep 2000
sleep 1000
#run general/parser/fill_us.sim #
sleep 2000
sleep 1000
run general/parser/first_last.sim
sleep 2000
sleep 1000
run general/parser/import_commit1.sim
sleep 2000
sleep 1000
run general/parser/import_commit2.sim
sleep 2000
sleep 1000
run general/parser/import_commit3.sim
sleep 2000
sleep 1000
#run general/parser/import_file.sim
sleep 2000
sleep 1000
run general/parser/insert_tb.sim
sleep 2000
sleep 1000
run general/parser/tags_dynamically_specifiy.sim
sleep 2000
sleep 1000
run general/parser/interp.sim
sleep 2000
sleep 1000
run general/parser/lastrow.sim
sleep 2000
sleep 1000
run general/parser/limit.sim
sleep 2000
sleep 1000
run general/parser/limit1.sim
sleep 2000
sleep 1000
run general/parser/limit1_tblocks100.sim
sleep 2000
sleep 1000
run general/parser/limit2.sim
sleep 2000
sleep 1000
run general/parser/mixed_blocks.sim
sleep 2000
sleep 1000
run general/parser/nchar.sim
sleep 2000
sleep 1000
run general/parser/null_char.sim
sleep 2000
sleep 1000
run general/parser/selectResNum.sim
sleep 2000
sleep 1000
run general/parser/select_across_vnodes.sim
sleep 2000
sleep 1000
run general/parser/select_from_cache_disk.sim
sleep 2000
sleep 1000
run general/parser/set_tag_vals.sim
sleep 2000
sleep 1000
run general/parser/single_row_in_tb.sim
sleep 2000
sleep 1000
run general/parser/slimit.sim
sleep 2000
sleep 1000
run general/parser/slimit1.sim
sleep 2000
sleep 1000
run general/parser/slimit_alter_tags.sim
sleep 2000
sleep 1000
run general/parser/tbnameIn.sim
sleep 2000
sleep 1000
run general/parser/slimit_alter_tags.sim # persistent failed
sleep 2000
sleep 1000
run general/parser/join.sim
sleep 2000
sleep 1000
run general/parser/join_multivnode.sim
sleep 2000
sleep 1000
run general/parser/projection_limit_offset.sim
sleep 2000
sleep 1000
run general/parser/select_with_tags.sim
sleep 2000
sleep 1000
run general/parser/groupby.sim
sleep 2000
sleep 1000
run general/parser/tags_filter.sim
sleep 2000
sleep 1000
run general/parser/topbot.sim
sleep 2000
sleep 1000
run general/parser/union.sim
sleep 2000
sleep 1000
run general/parser/constCol.sim
sleep 2000
sleep 1000
run general/parser/where.sim
sleep 2000
sleep 1000
run general/parser/timestamp.sim
sleep 2000
sleep 1000
run general/parser/sliding.sim
#sleep 2000
#sleep 1000
#run general/parser/repeatStream.sim
#sleep 2000
#sleep 1000
#run general/parser/stream_on_sys.sim
#sleep 2000
#sleep 1000
#run general/parser/stream.sim
\ No newline at end of file
......@@ -5,7 +5,7 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = ts_db
......@@ -59,10 +59,10 @@ run general/parser/timestamp_query.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 3000
sleep 1000
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 3000
sleep 1000
run general/parser/timestamp_query.sim
sleep 3000
sleep 1000
sql connect
$dbPrefix = ts_db
......
......@@ -5,7 +5,7 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sleep 1000
sql connect
$dbPrefix = wh_db
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册