提交 097c99b6 编写于 作者: H Hui Li

[sync master]

上级 615410ab
......@@ -31,7 +31,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
MESSAGE(STATUS "build version ${VERSION_INFO}")
SET_TARGET_PROPERTIES(taos PROPERTIES VERSION ${VERSION_INFO} SOVERSION 1)
ELSEIF (TD_WINDOWS_64)
ELSEIF (TD_WINDOWS_64 OR TD_WINDOWS_32)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows/win32)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
......
......@@ -197,7 +197,7 @@ typedef struct SDataBlockList {
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint16_t type; // query/insert/import type
char intervalTimeUnit;
char slidingTimeUnit;
int64_t etime, stime;
int64_t intervalTime; // aggregation time interval
......
......@@ -142,8 +142,8 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp
* Method: consumeImp
* Signature: (J)Lcom/taosdata/jdbc/TSDBResultSetRowData;
*/
JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
(JNIEnv *, jobject, jlong, jint);
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
(JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
......
......@@ -606,7 +606,7 @@ static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, in
return rowobj;
}
JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub, jint timeout) {
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) {
jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub);
jniGetGlobalMethod(env);
......@@ -616,38 +616,14 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI
int64_t start = taosGetTimestampMs();
int count = 0;
while (true) {
TAOS_RES * res = taos_consume(tsub);
TAOS_RES *res = taos_consume(tsub);
if (res == NULL) {
jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
return NULL;
}
TAOS_FIELD *fields = taos_fetch_fields(res);
int num_fields = taos_num_fields(res);
while (true) {
TAOS_ROW row = taos_fetch_row(res);
if (row == NULL) {
break;
}
jobject rowobj = convert_one_row(env, row, fields, num_fields);
(*env)->CallBooleanMethod(env, rows, g_arrayListAddFp, rowobj);
count++;
}
if (count > 0) {
break;
}
if (timeout == -1) {
continue;
}
if (((int)(taosGetTimestampMs() - start)) >= timeout) {
jniTrace("jobj:%p, sub:%ld, timeout", jobj, sub);
break;
}
}
return rows;
return res;
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub, jboolean keepProgress) {
......
......@@ -26,6 +26,7 @@
#include "tutil.h"
#include "tnote.h"
extern void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql);
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -347,8 +348,8 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
(*pSql->fp)(pSql->param, taosres, code);
if (shouldFree) {
tscFreeSqlObj(pSql);
tscTrace("%p Async sql is automatically freed in async res", pSql);
tscFreeSqlObj(pSql);
}
}
......@@ -494,6 +495,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if(pMeterMetaInfo->pMeterMeta == NULL) {
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
assert(code == TSDB_CODE_SUCCESS);
}
assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
......@@ -504,11 +510,6 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p get metricMeta during super table query successfully", pSql);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
code = tscGetMetricMeta(pSql, 0);
pRes->code = code;
......@@ -553,7 +554,6 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
}
if (pSql->pStream) {
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
/*
* NOTE:
* transfer the sql function for super table query before get meter/metric meta,
......@@ -561,6 +561,21 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
*/
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if ((UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)
&& ( pMeterMetaInfo->pMeterMeta == NULL
|| pMeterMetaInfo->pMetricMeta == NULL
|| pMeterMetaInfo->pMetricMeta->numOfMeters == 0
|| pMeterMetaInfo->pMetricMeta->numOfVnodes == 0))
|| (!(UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) && (pMeterMetaInfo->pMeterMeta == NULL))) {
tscTrace("%p stream:%p meta is updated, but no table, clear meter meta and set next launch new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
tscClearMeterMetaInfo(pMeterMetaInfo, false);
tscSetNextLaunchTimer(pSql->pStream, pSql);
return;
}
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
tscTansformSQLFunctionForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream);
} else {
......
......@@ -20,6 +20,7 @@
#include "thistogram.h"
#include "tinterpolation.h"
#include "tlog.h"
#include "tpercentile.h"
#include "tscJoinProcess.h"
#include "tscSyntaxtreefunction.h"
#include "tscompression.h"
......@@ -27,7 +28,6 @@
#include "ttime.h"
#include "ttypes.h"
#include "tutil.h"
#include "tpercentile.h"
#define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes)
......@@ -3669,7 +3669,7 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c
wchar_t accept[3] = {towupper(c), towlower(c), 0};
while (1) {
size_t n = wcsspn(str, accept);
size_t n = wcscspn(str, accept);
str += n;
if (str[0] == 0 || (n >= size - 1)) {
......@@ -3678,7 +3678,7 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c
str++;
int32_t ret = WCSPatternMatch(&patterStr[i], str, wcslen(str), pInfo);
int32_t ret = WCSPatternMatch(&patterStr[i], str, twcslen(str), pInfo);
if (ret != TSDB_PATTERN_NOMATCH) {
return ret;
}
......@@ -4104,8 +4104,6 @@ static void twa_function(SQLFunctionCtx *pCtx) {
if (pResInfo->superTableQ) {
memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo));
}
// pCtx->numOfIteratedElems += notNullElems;
}
static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
......@@ -4138,7 +4136,6 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
pInfo->lastKey = primaryKey[index];
setTWALastVal(pCtx, pData, 0, pInfo);
// pCtx->numOfIteratedElems += 1;
pResInfo->hasResult = DATA_SET_FLAG;
if (pResInfo->superTableQ) {
......@@ -4403,10 +4400,8 @@ static double do_calc_rate(const SRateInfo* pRateInfo) {
}
}
int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey;
duration = (duration + 500) / 1000;
double resultVal = ((double)diff) / duration;
double duration = (pRateInfo->lastKey - pRateInfo->firstKey) / 1000.0;
double resultVal = diff / duration;
pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%f lastValue:%f CorrectionValue:%f resultVal:%f",
pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal);
......@@ -4448,6 +4443,30 @@ static void rate_function(SQLFunctionCtx *pCtx) {
pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
if (pCtx->order == TSQL_SO_ASC) {
#ifdef NOT_EQUINIX
// prev interpolation exists
if (pCtx->prev.key != -1) {
pRateInfo->firstValue = pCtx->prev.data;
pRateInfo->firstKey = pCtx->prev.key;
pCtx->prev.key = -1; // clear the flag
pTrace("%p get prev interpolation for firstValue:%f firstKey:%" PRId64, pCtx, pRateInfo->firstValue, pRateInfo->firstKey);
if (-DBL_MAX == pRateInfo->lastValue) {
pRateInfo->lastValue = pCtx->prev.data;
pRateInfo->lastKey = pCtx->prev.key;
} else if (pCtx->prev.data < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
pRateInfo->lastValue = pCtx->prev.data;
pRateInfo->lastKey = pCtx->prev.key;
pTrace("lastValue:%f lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey);
}
}
#endif
for (int32_t i = 0; i < pCtx->size; ++i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
......@@ -4504,6 +4523,111 @@ static void rate_function(SQLFunctionCtx *pCtx) {
assert(pCtx->size == notNullElems);
}
#ifdef NOT_EQUINIX
if (pCtx->next.key != -1) {
if (pCtx->next.data < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->lastValue = pCtx->next.data;
pRateInfo->lastKey = pCtx->next.key;
pCtx->next.key = -1;
pTrace("%p get next interpolation for lastValue:%f lastKey:%" PRId64, pCtx, pRateInfo->lastValue, pRateInfo->lastKey);
}
#endif
} else {
#ifdef NOT_EQUINIX
if (pCtx->next.key != -1) {
pRateInfo->lastValue = pCtx->next.data;
pRateInfo->lastKey = pCtx->next.key;
pCtx->next.key = -1;
pTrace("%p get next interpolation for lastValue:%f lastKey:%" PRId64, pCtx, pRateInfo->lastValue, pRateInfo->lastKey);
if (-DBL_MAX == pRateInfo->firstValue) {
pRateInfo->firstValue = pCtx->next.data;
pRateInfo->firstKey = pCtx->next.key;
} else if (pCtx->next.data > pRateInfo->firstValue) {
pRateInfo->CorrectionValue += pCtx->next.data;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
pRateInfo->firstValue = pCtx->next.data;
pRateInfo->firstKey = pCtx->next.key;
pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey);
}
}
#endif
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
pTrace("%p rate_function() index of null data:%d", pCtx, i);
continue;
}
notNullElems++;
double v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = (double)GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = (double)GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_INT:
v = (double)GET_INT32_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (double)GET_INT64_VAL(pData);
break;
case TSDB_DATA_TYPE_FLOAT:
v = (double)GET_FLOAT_VAL(pData);
break;
case TSDB_DATA_TYPE_DOUBLE:
v = (double)GET_DOUBLE_VAL(pData);
break;
default:
assert(0);
}
if ((-DBL_MAX == pRateInfo->lastValue) || (INT64_MIN == pRateInfo->lastKey)) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey);
}
if (-DBL_MAX == pRateInfo->firstValue) {
pRateInfo->firstValue = v;
} else if (v > pRateInfo->firstValue) {
pRateInfo->CorrectionValue += v;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[i];
pTrace("firstValue:%f firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey);
}
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems);
}
#ifdef NOT_EQUINIX
if (pCtx->prev.key != -1) {
if (pCtx->prev.data > pRateInfo->firstValue) {
pRateInfo->CorrectionValue += pCtx->prev.data;
pTrace("CorrectionValue:%f", pRateInfo->CorrectionValue);
}
pRateInfo->firstValue = pCtx->prev.data;
pRateInfo->firstKey = pCtx->prev.key;
pCtx->prev.key = -1;
pTrace("%p get prev interpolation for firstValue:%f firstKey:%" PRId64, pCtx, pRateInfo->firstValue, pRateInfo->firstKey);
}
#endif
};
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
......@@ -4637,7 +4761,7 @@ static void rate_finalizer(SQLFunctionCtx *pCtx) {
pTrace("%p isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%f lastValue:%f CorrectionValue:%f hasResult:%d",
pCtx, pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult);
if (pRateInfo->hasResult != DATA_SET_FLAG) {
if ((pRateInfo->hasResult != DATA_SET_FLAG) || (INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey)) {
setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
return;
}
......@@ -4669,6 +4793,16 @@ static void irate_function(SQLFunctionCtx *pCtx) {
return;
}
#ifdef NOT_EQUINIX
// next interpolation exists
if (pCtx->next.key != -1) {
pRateInfo->lastValue = pCtx->next.data;
pRateInfo->lastKey = pCtx->next.key;
pCtx->next.key = -1; // clear the flag
pTrace("%p irate_function() get next interpolation for lastValue:%f lastKey:%" PRId64, pCtx, pRateInfo->lastValue, pRateInfo->lastKey);
}
#endif
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
......@@ -4702,8 +4836,7 @@ static void irate_function(SQLFunctionCtx *pCtx) {
assert(0);
}
// TODO: calc once if only call this function once ????
if ((INT64_MIN == pRateInfo->lastKey) || (-DBL_MAX == pRateInfo->lastValue)) {
if (1 == notNullElems) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
......@@ -4711,14 +4844,23 @@ static void irate_function(SQLFunctionCtx *pCtx) {
continue;
}
if ((INT64_MIN == pRateInfo->firstKey) || (-DBL_MAX == pRateInfo->firstValue)){
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[i];
pTrace("%p irate_function() firstValue:%f firstKey:%" PRId64, pCtx, pRateInfo->firstValue, pRateInfo->firstKey);
break;
}
#ifdef NOT_EQUINIX
if (pCtx->prev.key != -1) {
if ((INT64_MIN == pRateInfo->firstKey) || (-DBL_MAX == pRateInfo->firstValue)) {
pRateInfo->firstValue = pCtx->prev.data;
pRateInfo->firstKey = pCtx->prev.key;
pCtx->prev.key = -1;
pTrace("%p irate_function() get prev interpolation for firstValue:%f firstKey:%" PRId64, pCtx, pRateInfo->firstValue, pRateInfo->firstKey);
}
}
#endif
SET_VAL(pCtx, notNullElems, 1);
......@@ -4803,6 +4945,10 @@ static void do_sumrate_merge(SQLFunctionCtx *pCtx) {
if (pInput->hasResult != DATA_SET_FLAG) {
continue;
} else if (pInput->num == 0) {
if ((INT64_MIN == pInput->lastKey) || (INT64_MIN == pInput->firstKey)) {
continue;
}
pRateInfo->sum += do_calc_rate(pInput);
pRateInfo->num++;
} else {
......@@ -4843,6 +4989,11 @@ static void sumrate_finalizer(SQLFunctionCtx *pCtx) {
if (pRateInfo->num == 0) {
// from meter
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey)) {
setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
return;
}
*(double*)pCtx->aOutputBuf = do_calc_rate(pRateInfo);
} else if (pCtx->functionId == TSDB_FUNC_SUM_RATE || pCtx->functionId == TSDB_FUNC_SUM_IRATE) {
*(double*)pCtx->aOutputBuf = pRateInfo->sum;
......
......@@ -434,6 +434,19 @@ static void tscProcessServStatus(SSqlObj *pSql) {
if (pObj->pHb->res.code == TSDB_CODE_NETWORK_UNAVAIL) {
pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL;
return;
} else {
int32_t* data = (int32_t*) pObj->pHb->res.data;
if (data != NULL) {
int32_t totalDnode = data[0];
int32_t onlineDnode = data[1];
assert(onlineDnode <= totalDnode);
if (onlineDnode < totalDnode) {
pSql->res.code = TSDB_CODE_NETWORK_UNAVAIL;
return;
}
}
}
} else {
if (pSql->res.code == TSDB_CODE_NETWORK_UNAVAIL) {
......
......@@ -254,7 +254,9 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
if (pToken->type == TK_NULL) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
} else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
((strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)
|| (strncasecmp("nan", pToken->z, pToken->n) == 0)
|| (strncasecmp("-nan", pToken->z, pToken->n) == 0))) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
} else {
double dv;
......@@ -279,7 +281,9 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
if (pToken->type == TK_NULL) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
} else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
((strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)
|| (strncasecmp("nan", pToken->z, pToken->n) == 0)
|| (strncasecmp("-nan", pToken->z, pToken->n) == 0))) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
} else {
double dv;
......@@ -1434,6 +1438,9 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
pTableDataBlock->size = sizeof(SShellSubmitBlock);
pTableDataBlock->rowSize = pMeterMeta->rowSize;
code = tscAllocateMemIfNeed(pTableDataBlock, rowSize, &maxRows);
if (TSDB_CODE_SUCCESS != code) return -1;
numOfRows += pSql->res.numOfRows;
pSql->res.numOfRows = 0;
count = 0;
......
......@@ -292,7 +292,7 @@ void tscKillConnection(STscObj *pObj) {
pthread_mutex_unlock(&pObj->mutex);
taos_close(pObj);
tscTrace("connection:%p is killed", pObj);
taos_close(pObj);
}
......@@ -282,6 +282,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char* msg2 = "name too long";
SCreateDBInfo* pCreateDB = &(pInfo->pDCLInfo->dbOpt);
pCmd->existsCheck = pInfo->pDCLInfo->existsCheck;
if (tscValidateName(&pCreateDB->dbname) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
......@@ -598,9 +599,6 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
pQueryInfo->intervalTime = pQueryInfo->intervalTime / 1000;
}
/* parser has filter the illegal type, no need to check here */
pQueryInfo->intervalTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1];
// interval cannot be less than 10 milliseconds
if (pQueryInfo->intervalTime < tsMinIntervalTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
......@@ -689,8 +687,13 @@ int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
pQueryInfo->slidingTimeUnit = pQuerySql->sliding.z[pQuerySql->sliding.n - 1];
} else {
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
// parser has filter the illegal type, no need to check here
pQueryInfo->slidingTimeUnit = pQuerySql->interval.z[pQuerySql->interval.n - 1];
}
return TSDB_CODE_SUCCESS;
......@@ -1636,13 +1639,16 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
// set the first column ts for diff query
if (optr == TK_DIFF) {
colIdx += 1;
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
TSDB_KEYSIZE, TSDB_KEYSIZE);
SColumnList ids = getColumnList(1, 0, 0);
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].aName,
pExpr);
} else if ((optr >= TK_RATE) && (optr <= TK_AVG_IRATE)) {
SColumnIndex index1 = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnBaseInfoInsert(pQueryInfo, &index1);
}
// functions can not be applied to tags
......@@ -1940,6 +1946,7 @@ static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SSQLToken
if (strncasecmp(pSchema[i].name, pToken->z, pToken->n) == 0) {
columnIndex = i;
break;
}
}
......@@ -2588,7 +2595,7 @@ static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterIn
tVariantDump(&pRight->val, (char*)pColumnFilter->pz, colType);
size_t len = wcslen((wchar_t*)pColumnFilter->pz);
size_t len = twcslen((wchar_t*)pColumnFilter->pz);
pColumnFilter->len = len * TSDB_NCHAR_SIZE;
} else {
tVariantDump(&pRight->val, (char*)&pColumnFilter->lowerBndd, colType);
......@@ -4650,6 +4657,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
if (pMeterMetaInfo->pMeterMeta == NULL || pMetricMeta == NULL || pMetricMeta->numOfMeters == 0) {
tscTrace("%p no table in metricmeta, no output result", pSql);
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
pSql->res.qhandle = 0x1; // to pass the qhandle check;
}
// keep original limitation value in globalLimit
......@@ -5621,7 +5629,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return doLocalQueryProcess(pQueryInfo, pQuerySql);
}
if (pQuerySql->from->nExpr > TSDB_MAX_JOIN_TABLE_NUM) {
if (pQuerySql->from->nExpr > 2) { // not allowed more than 2 table join
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
......
......@@ -446,6 +446,8 @@ int32_t getTimestampInUsFromStrImpl(int64_t val, char unit, int64_t *result) {
break;
case 'a':
break;
case 'u':
return 0;
default: {
;
return -1;
......@@ -813,6 +815,7 @@ void setCreateDBSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pToken, SCreateDBI
pInfo->pDCLInfo->dbOpt.dbname = *pToken;
tTokenListAppend(pInfo->pDCLInfo, pIgExists);
pInfo->pDCLInfo->existsCheck = (pIgExists->n == 1);
}
void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken *pPwd, SCreateAcctSQL *pAcctInfo) {
......
......@@ -231,12 +231,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
idx += 1;
}
}
assert(idx >= pReducer->numOfBuffer);
if (idx == 0) {
free(pReducer);
return;
}
pReducer->numOfBuffer = idx;
pReducer->numOfBuffer = idx; // the actual entries that has result for merge
SCompareParam *param = malloc(sizeof(SCompareParam));
param->pLocalData = pReducer->pLocalDataSrc;
......@@ -324,7 +325,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo;
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
......@@ -613,6 +614,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pSchema = (SSchema *)calloc(1, sizeof(SSchema) * pQueryInfo->exprsInfo.numOfExprs);
if (pSchema == NULL) {
tfree(*pMemBuffer);
tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pRes->code;
......@@ -634,15 +636,27 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
}
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
if (pModel == NULL) {
goto _error_memory;
}
for (int32_t i = 0; i < pMeterMetaInfo->pMetricMeta->numOfVnodes; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
if ((*pMemBuffer)[i] == NULL) {
for (int32_t j=0; j < i; ++j ) {
destroyExtMemBuffer((*pMemBuffer)[j]);
}
goto _error_memory;
}
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pRes->code;
for (int32_t i = 0; i < pMeterMetaInfo->pMetricMeta->numOfVnodes; ++i) {
destroyExtMemBuffer((*pMemBuffer)[i]);
}
goto _error_memory;
}
// final result depends on the fields number
......@@ -685,6 +699,13 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
tfree(pSchema);
return TSDB_CODE_SUCCESS;
_error_memory:
tfree(pSchema);
tfree(*pMemBuffer);
tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pRes->code;
}
/**
......@@ -698,7 +719,7 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe
destroyColumnModel(pFinalModel);
tOrderDescDestroy(pDesc);
for (int32_t i = 0; i < numOfVnodes; ++i) {
pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]);
pMemBuffer[i] = destroyExtMemBuffer(pMemBuffer[i]);
}
tfree(pMemBuffer);
......@@ -779,7 +800,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
pLocalReducer->rowSize);
......@@ -923,7 +944,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
while (1) {
int32_t remains = taosNumOfRemainPoints(pInterpoInfo);
TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision);
pQueryInfo->slidingTimeUnit, precision);
int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity);
......@@ -1275,7 +1296,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t newTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision);
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime,
pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize);
......@@ -1305,7 +1326,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
int32_t remain = taosNumOfRemainPoints(pInterpoInfo);
TSKEY ekey =
taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p);
taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, p);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain,
pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
......@@ -1338,7 +1359,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision);
pQueryInfo->slidingTimeUnit, precision);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
......
......@@ -106,12 +106,12 @@ static int32_t tscGetMgmtConnMaxRetryTimes() {
return tscMgmtIpList.numOfIps * factor;
}
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
int32_t tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return;
if (pObj == NULL) return TSDB_CODE_APP_ERROR;
if (pObj != pObj->signature) {
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
return;
return TSDB_CODE_APP_ERROR;
}
SSqlObj *pSql = pObj->pHb;
......@@ -128,11 +128,19 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pRsp->queryId) tscKillQuery(pObj, pRsp->queryId);
if (pRsp->streamId) tscKillStream(pObj, pRsp->streamId);
}
if (pRes->data == NULL) {
pRes->data = calloc(2, sizeof(int32_t));
}
((int32_t*)pRes->data)[0] = htonl(pRsp->totalDnodes);
((int32_t*)pRes->data)[1] = htonl(pRsp->onlineDnodes);
} else {
tscTrace("heart beat failed, code:%d", code);
}
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return code;
}
void tscProcessActivityTimer(void *handle, void *tmrId) {
......@@ -398,9 +406,9 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
if (pSql->freed || pObj->signature != pObj) {
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
pObj, pObj->signature);
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
//taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
tscFreeSqlObj(pSql);
return ahandle;
return NULL;
}
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
......@@ -600,8 +608,8 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
taos_close(pObj);
tscTrace("%p Async sql close failed connection", pSql);
} else {
tscFreeSqlObj(pSql);
tscTrace("%p Async sql is automatically freed", pSql);
tscFreeSqlObj(pSql);
}
}
}
......@@ -710,15 +718,21 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
int doProcessSql(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
int32_t code = TSDB_CODE_SUCCESS;
void *asyncFp = pSql->fp;
if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB ||
pCmd->command == TSDB_SQL_META || pCmd->command == TSDB_SQL_METRIC) {
tscBuildMsg[pCmd->command](pSql, NULL);
code = tscBuildMsg[pCmd->command](pSql, NULL);
}
int32_t code = tscSendMsgToServer(pSql);
if (code != TSDB_CODE_SUCCESS) {
pRes->code = code;
return code;
}
code = tscSendMsgToServer(pSql);
if (asyncFp) {
if (code != TSDB_CODE_SUCCESS) {
......@@ -994,7 +1008,13 @@ int tscLaunchSTableSubqueries(SSqlObj *pSql) {
SRetrieveSupport* pSupport = pSub->param;
tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
tscProcessSql(pSub);
int code = tscProcessSql(pSub);
if (code != TSDB_CODE_SUCCESS) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
doCleanupSubqueries(pSql, i, pState);
pRes->code = code;
return pRes->code;
}
}
return TSDB_CODE_SUCCESS;
......@@ -1681,7 +1701,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime);
pQueryMsg->intervalTimeUnit = pQueryInfo->intervalTimeUnit;
pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime);
if (pQueryInfo->intervalTime < 0) {
......@@ -2148,7 +2168,12 @@ int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(SDropUserMsg);
pCmd->payloadLen = pMsg - pStart;
if (pInfo->type == TSDB_SQL_DROP_ACCT) {
pCmd->msgType = TSDB_MSG_TYPE_DROP_ACCT;
} else {
pCmd->msgType = TSDB_MSG_TYPE_DROP_USER;
}
return TSDB_CODE_SUCCESS;
}
......@@ -2697,7 +2722,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscTrace("%p build load multi-metermeta msg completed, numOfMeters:%d, msg size:%d", pSql, pCmd->count,
pCmd->payloadLen);
return pCmd->payloadLen;
return TSDB_CODE_SUCCESS;
}
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
......@@ -2924,7 +2949,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCmd->msgType = TSDB_MSG_TYPE_HEARTBEAT;
assert(msgLen + minMsgSize() <= size);
return msgLen;
return TSDB_CODE_SUCCESS;
}
int tscProcessMeterMetaRsp(SSqlObj *pSql) {
......@@ -3684,7 +3709,8 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
SMeterMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
SMeterMeta *pMeterMeta = (SMeterMeta *)taosGetDataFromExists(tscCacheHandle, pQueryInfo->pMeterInfo[i]->pMeterMeta);
assert(pMeterMeta != NULL);
tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
}
......
......@@ -321,6 +321,9 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if(NULL == pQueryInfo){
return NULL;
}
return pQueryInfo->fieldsInfo.pFields;
}
......@@ -726,6 +729,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
#if 0
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -765,6 +769,11 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
}
return nRows;
#endif
(*rows) = taos_fetch_row(res);
return ((*rows) != NULL)? 1:0;
}
int taos_select_db(TAOS *taos, const char *db) {
......@@ -796,8 +805,8 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
if (pSql->fp != NULL) {
pSql->thandle = NULL;
tscFreeSqlObj(pSql);
tscTrace("%p Async SqlObj is freed by app", pSql);
tscFreeSqlObj(pSql);
} else if (keepCmd) {
tscFreeSqlResult(pSql);
} else {
......
......@@ -28,7 +28,7 @@
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows);
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql);
void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql);
static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer);
static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) {
......@@ -97,6 +97,18 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
return;
}
if ((UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)
&& ( pMeterMetaInfo->pMeterMeta == NULL
|| pMeterMetaInfo->pMetricMeta == NULL
|| pMeterMetaInfo->pMetricMeta->numOfMeters == 0
|| pMeterMetaInfo->pMetricMeta->numOfVnodes == 0))
|| (!(UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) && (pMeterMetaInfo->pMeterMeta == NULL))) {
tscTrace("%p no table in metricmeta, no launch query", pSql);
tscClearMeterMetaInfo(pMeterMetaInfo, false);
tscSetNextLaunchTimer(pStream, pSql);
return;
}
tscTrace("%p stream:%p start stream query on:%s", pSql, pStream, pMeterMetaInfo->name);
tscProcessSql(pStream->pSql);
......@@ -323,7 +335,7 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
}
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
int64_t timer = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
......@@ -374,8 +386,7 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
}
static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
int64_t minIntervalTime = tsMinIntervalTime;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
......@@ -391,8 +402,7 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
}
int64_t minSlidingTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
int64_t minSlidingTime = tsMinSlidingTime;
if (pQueryInfo->slidingTime == -1) {
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
......@@ -582,10 +592,10 @@ void taos_close_stream(TAOS_STREAM *handle) {
tscRemoveFromStreamList(pStream, pSql);
taosTmrStopA(&(pStream->pTimer));
tscTrace("%p stream:%p is closed", pSql, pStream);
tscFreeSqlObj(pSql);
pStream->pSql = NULL;
tscTrace("%p stream:%p is closed", pSql, pStream);
tfree(pStream);
}
}
......@@ -104,6 +104,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
return NULL;
}
char* sqlstr = NULL;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -114,7 +115,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
pSql->signature = pSql;
pSql->pTscObj = pObj;
char* sqlstr = (char*)malloc(strlen(sql) + 1);
sqlstr = (char*)malloc(strlen(sql) + 1);
if (sqlstr == NULL) {
tscError("failed to allocate sql string for subscription");
goto failed;
......
......@@ -87,6 +87,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) {
MD5Update(&ctx, (uint8_t*)tmp, keyLen);
char* pStr = base64_encode(ctx.digest, tListLen(ctx.digest));
strcpy(str, pStr);
free(pStr);
}
free(tmp);
......@@ -456,9 +457,11 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
pCmd->command = 0;
// pSql->sqlstr will be used by tscBuildQueryStreamDesc
if (pObj->signature == pObj) {
pthread_mutex_lock(&pObj->mutex);
tfree(pSql->sqlstr);
pthread_mutex_unlock(&pObj->mutex);
}
tscFreeSqlResult(pSql);
tfree(pSql->pSubs);
......@@ -2009,7 +2012,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
// create the fields info from the sql functions
SColumnList columnList = {.num = 1};
SColumnList columnList = {.num = 0};
for(int32_t k = 0; k < numOfOutputCols; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, indexList[k]);
......@@ -2063,7 +2066,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pMeterMetaInfo->tagColumnIndex);
}
assert(pFinalInfo->pMeterMeta != NULL && pNewQueryInfo->numOfTables == 1);
if (pFinalInfo->pMeterMeta == NULL) {
tscError("%p new subquery failed for get pMeterMeta is NULL from cache", pSql);
tscFreeSqlObj(pNew);
return NULL;
}
assert(pNewQueryInfo->numOfTables == 1);
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
assert(pFinalInfo->pMetricMeta != NULL);
}
......
......@@ -37,7 +37,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<java.version>1.7</java.version>
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
......
......@@ -84,6 +84,14 @@ public class TSDBConnection implements Connection {
}
}
public TSDBSubscribe createSubscribe() throws SQLException {
if (!this.connector.isClosed()) {
return new TSDBSubscribe(this.connector);
} else {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
}
public PreparedStatement prepareStatement(String sql) throws SQLException {
if (!this.connector.isClosed()) {
return new TSDBPreparedStatement(this.connector, sql);
......
......@@ -22,8 +22,8 @@ public class TSDBJNIConnector {
static volatile Boolean isInitialized = false;
static {
System.loadLibrary("taos");
System.out.println("java.library.path:" + System.getProperty("java.library.path"));
System.loadLibrary("taos");
}
/**
......@@ -261,31 +261,31 @@ public class TSDBJNIConnector {
/**
* Subscribe to a table in TSDB
*/
public long subscribe(String host, String user, String password, String database, String table, long time, int period) {
return subscribeImp(host, user, password, database, table, time, period);
public long subscribe(String topic, String sql, boolean restart, int period) {
return subscribeImp(this.taos, restart, topic, sql, period);
}
private native long subscribeImp(String host, String user, String password, String database, String table, long time, int period);
public native long subscribeImp(long connection, boolean restart, String topic, String sql, int period);
/**
* Consume a subscribed table
*/
public TSDBResultSetRowData consume(long subscription) {
public long consume(long subscription) {
return this.consumeImp(subscription);
}
private native TSDBResultSetRowData consumeImp(long subscription);
private native long consumeImp(long subscription);
/**
* Unsubscribe a table
*
* @param subscription
*/
public void unsubscribe(long subscription) {
unsubscribeImp(subscription);
public void unsubscribe(long subscription, boolean isKeep) {
unsubscribeImp(subscription, isKeep);
}
private native void unsubscribeImp(long subscription);
private native void unsubscribeImp(long subscription, boolean isKeep);
/**
* Validate if a <I>create table</I> sql statement is correct without actually creating that table
......@@ -293,7 +293,7 @@ public class TSDBJNIConnector {
public boolean validateCreateTableSql(String sql) {
long connection = taos;
int res = validateCreateTableSqlImp(connection, sql.getBytes());
return res != 0 ? false : true;
return res == 0;
}
private native int validateCreateTableSqlImp(long connection, byte[] sqlBytes);
......
/***************************************************************************
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*****************************************************************************/
package com.taosdata.jdbc;
import javax.management.OperationsException;
import java.sql.SQLException;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.*;
public class TSDBSubscribe {
private TSDBJNIConnector connecter = null;
private static ScheduledExecutorService pool;
private static Map<Long, TSDBTimerTask> timerTaskMap = new ConcurrentHashMap<>();
private static Map<Long, ScheduledFuture> scheduledMap = new ConcurrentHashMap();
private static class TimerInstance {
private static final ScheduledExecutorService instance = Executors.newScheduledThreadPool(1);
}
public static ScheduledExecutorService getTimerInstance() {
return TimerInstance.instance;
}
public TSDBSubscribe(TSDBJNIConnector connecter) throws SQLException {
if (null != connecter) {
this.connecter = connecter;
} else {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
}
/**
* sync subscribe
*
* @param topic
* @param sql
* @param restart
* @param period
* @throws SQLException
*/
public long subscribe(String topic, String sql, boolean restart, int period) throws SQLException {
if (this.connecter.isClosed()) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
if (period < 1000) {
throw new SQLException(TSDBConstants.WrapErrMsg(TSDBConstants.INVALID_VARIABLES));
}
return this.connecter.subscribe(topic, sql, restart, period);
}
/**
* async subscribe
*
* @param topic
* @param sql
* @param restart
* @param period
* @param callBack
* @throws SQLException
*/
public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException {
if (this.connecter.isClosed()) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
final long subscription = this.connecter.subscribe(topic, sql, restart, period);
if (null != callBack) {
pool = getTimerInstance();
TSDBTimerTask timerTask = new TSDBTimerTask(subscription, callBack);
timerTaskMap.put(subscription, timerTask);
ScheduledFuture scheduledFuture = pool.scheduleAtFixedRate(timerTask, 1, 1000, TimeUnit.MILLISECONDS);
scheduledMap.put(subscription, scheduledFuture);
}
return subscription;
}
public TSDBResultSet consume(long subscription) throws OperationsException, SQLException {
if (this.connecter.isClosed()) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
if (0 == subscription) {
throw new OperationsException("Invalid use of consume");
}
long resultSetPointer = this.connecter.consume(subscription);
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
return null;
} else {
return new TSDBResultSet(this.connecter, resultSetPointer);
}
}
/**
* cancel subscribe
*
* @param subscription
* @param isKeep
* @throws SQLException
*/
public void unsubscribe(long subscription, boolean isKeep) throws SQLException {
if (this.connecter.isClosed()) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
if (null != timerTaskMap.get(subscription)) {
synchronized (timerTaskMap.get(subscription)) {
while (1 == timerTaskMap.get(subscription).getState()) {
try {
Thread.sleep(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
timerTaskMap.get(subscription).setState(2);
if (!timerTaskMap.isEmpty() && timerTaskMap.containsKey(subscription)) {
timerTaskMap.get(subscription).cancel();
timerTaskMap.remove(subscription);
scheduledMap.get(subscription).cancel(false);
scheduledMap.remove(subscription);
}
this.connecter.unsubscribe(subscription, isKeep);
}
} else {
this.connecter.unsubscribe(subscription, isKeep);
}
}
class TSDBTimerTask extends TimerTask {
private long subscription;
private TSDBSubscribeCallBack callBack;
// 0: not running 1: running 2: cancel
private int state = 0;
public TSDBTimerTask(long subscription, TSDBSubscribeCallBack callBack) {
this.subscription = subscription;
this.callBack = callBack;
}
public int getState() {
return this.state;
}
public void setState(int state) {
this.state = state;
}
@Override
public void run() {
synchronized (this) {
if (2 == state) {
return;
}
state = 1;
try {
TSDBResultSet resultSet = consume(subscription);
callBack.invoke(resultSet);
} catch (Exception e) {
this.cancel();
throw new RuntimeException(e);
}
state = 0;
}
}
}
}
/***************************************************************************
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*****************************************************************************/
package com.taosdata.jdbc;
public interface TSDBSubscribeCallBack {
void invoke(TSDBResultSet resultSet);
}
import com.taosdata.jdbc.*;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
public class TestAsyncTSDBSubscribe {
public static void main(String[] args) {
String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " +
"-tname tableName -h host";
if (args.length < 2) {
System.err.println(usage);
return;
}
String dbName = "";
String tName = "";
String host = "localhost";
String topic = "";
for (int i = 0; i < args.length; i++) {
if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
dbName = args[++i];
}
if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
tName = args[++i];
}
if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
host = args[++i];
}
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
topic = args[++i];
}
}
if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
System.err.println(usage);
return;
}
Connection connection = null;
TSDBSubscribe subscribe = null;
long subscribId = 0;
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata", properties);
String rawSql = "select * from " + tName + ";";
subscribe = ((TSDBConnection) connection).createSubscribe();
subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first"));
long subscribId2 = subscribe.subscribe("test", rawSql, false, 1000, new CallBack("second"));
int a = 0;
Thread.sleep(2000);
subscribe.unsubscribe(subscribId, true);
System.err.println("cancel subscribe");
} catch (Exception e) {
e.printStackTrace();
}
}
private static class CallBack implements TSDBSubscribeCallBack {
private String name = "";
public CallBack(String name) {
this.name = name;
}
@Override
public void invoke(TSDBResultSet resultSet) {
try {
while (null !=resultSet && resultSet.next()) {
System.out.print("callback_" + name + ": ");
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.printf(i + ": " + resultSet.getString(i) + "\t");
}
System.out.println();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
......@@ -11,8 +11,14 @@ public class TestPreparedStatement {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "192.168.1.117");
Connection connection = DriverManager.getConnection("jdbc:TAOS://192.168.1.117:0/?user=root&password=taosdata", properties);
Connection connection = DriverManager.getConnection("jdbc:TAOS://10.211.55.3:0/log?user=root&password=taosdata", properties);
String createSql = "create table t (ts timestamp, speed int);";
Statement statement = connection.createStatement();
statement.executeQuery(createSql);
String rawSql = "SELECT ts, c1 FROM (select c1, ts from db.tb1) SUB_QRY";
if (1 < 2) {
return;
}
// String[] params = new String[]{"ts", "c1"};
PreparedStatement pstmt = (TSDBPreparedStatement) connection.prepareStatement(rawSql);
ResultSet resSet = pstmt.executeQuery();
......
import com.taosdata.jdbc.ColumnMetaData;
import com.taosdata.jdbc.DatabaseMetaDataResultSet;
import com.taosdata.jdbc.TSDBResultSetRowData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class TestTSDBResultSetRowData {
public static void main(String[] args) throws SQLException {
DatabaseMetaDataResultSet resultSet = new DatabaseMetaDataResultSet();
List<ColumnMetaData> columnMetaDataList = new ArrayList(1);
ColumnMetaData colMetaData = new ColumnMetaData();
colMetaData.setColIndex(0);
colMetaData.setColName("TABLE_TYPE");
colMetaData.setColSize(10);
colMetaData.setColType(8);
columnMetaDataList.add(colMetaData);
List<TSDBResultSetRowData> rowDataList = new ArrayList(2);
TSDBResultSetRowData rowData = new TSDBResultSetRowData(2);
rowData.setString(0, "TABLE");
rowDataList.add(rowData);
rowData = new TSDBResultSetRowData(2);
rowData.setString(0, "STABLE");
rowDataList.add(rowData);
resultSet.setColumnMetaDataList(columnMetaDataList);
resultSet.setRowDataList(rowDataList);
while (resultSet.next()) {
System.out.println(resultSet.getString(1));
}
}
}
import com.taosdata.jdbc.TSDBConnection;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBResultSet;
import com.taosdata.jdbc.TSDBSubscribe;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
public class TestTSDBSubscribe {
public static void main(String[] args) throws Exception {
String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " +
"-topic topicName -tname tableName -h host";
if (args.length < 2) {
System.err.println(usage);
return;
}
String dbName = "";
String tName = "";
String host = "localhost";
String topic = "";
for (int i = 0; i < args.length; i++) {
if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
dbName = args[++i];
}
if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
tName = args[++i];
}
if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
host = args[++i];
}
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
topic = args[++i];
}
}
if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
System.err.println(usage);
return;
}
Connection connection = null;
TSDBSubscribe subscribe = null;
long subscribId = 0;
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata"
, properties);
String rawSql = "select * from " + tName + ";";
subscribe = ((TSDBConnection) connection).createSubscribe();
subscribId = subscribe.subscribe(topic, rawSql, false, 1000);
int a = 0;
while (true) {
Thread.sleep(900);
TSDBResultSet resSet = subscribe.consume(subscribId);
while (resSet.next()) {
for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) {
System.out.printf(i + ": " + resSet.getString(i) + "\t");
}
System.out.println("\n======" + a + "==========");
}
a++;
if (a >= 10) {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != subscribe && 0 != subscribId) {
subscribe.unsubscribe(subscribId, true);
}
if (null != connection) {
connection.close();
}
}
}
}
......@@ -316,6 +316,9 @@ class CTaosInterface(object):
blocks = [None] * len(fields)
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
continue
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
......
......@@ -316,6 +316,9 @@ class CTaosInterface(object):
blocks = [None] * len(fields)
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
continue
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
......
......@@ -105,7 +105,7 @@ extern SSdbPeer *sdbPeer[];
#endif
void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory,
void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, char keyType, char *directory,
void *(*appTool)(char, void *, char *, int, int *));
void *sdbGetRow(void *handle, void *key);
......
......@@ -439,9 +439,15 @@ typedef struct SSqlFuncExprMsg {
} SSqlFuncExprMsg;
typedef struct SSqlBinaryExprInfo {
union {
struct tSQLBinaryExpr *pBinExpr; /* for binary expression */
int64_t resvSpace0;
};
int32_t numOfCols; /* binary expression involves the readed number of columns*/
union {
SColIndexEx * pReqColumns; /* source column list */
int64_t resvSpace1;
};
} SSqlBinaryExprInfo;
typedef struct SSqlFunctionExpr {
......@@ -482,7 +488,11 @@ typedef struct SColumnInfo {
int16_t type;
int16_t bytes;
int16_t numOfFilters;
union {
SColumnFilterInfo *filters;
int64_t resvSpace;
};
} SColumnInfo;
/*
......@@ -513,7 +523,7 @@ typedef struct {
int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode
char intervalTimeUnit; // time interval type, for revisement of interval(1d)
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
int64_t intervalTime; // time interval for aggregation, in million second
int64_t slidingTime; // value for sliding window
......@@ -811,6 +821,8 @@ typedef struct {
typedef struct {
uint32_t queryId;
uint32_t streamId;
uint32_t totalDnodes;
uint32_t onlineDnodes;
char killConnection;
SIpList ipList;
} SHeartBeatRsp;
......
......@@ -136,7 +136,7 @@ tExtMemBuffer *createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnMo
* @param pMemBuffer
* @return
*/
void *destoryExtMemBuffer(tExtMemBuffer *pMemBuffer);
void *destroyExtMemBuffer(tExtMemBuffer *pMemBuffer);
/**
* @param pMemBuffer
......
......@@ -30,7 +30,7 @@ typedef struct SInterpolationInfo {
char * prevValues; // previous row of data
char * nextValues; // next row of data
int32_t numOfTags;
char ** pTags; // tags value for current interoplation
char ** pTags; // tags value for current interpolation
} SInterpolationInfo;
typedef struct SPoint {
......@@ -83,6 +83,8 @@ int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoTyp
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point);
#ifdef __cplusplus
}
#endif
......
......@@ -103,8 +103,8 @@ extern "C" {
#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 6mb
#define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS * 16
#define TSDB_MAX_TAGS_LEN 512
#define TSDB_MAX_TAGS 32
#define TSDB_MAX_TAGS_LEN 2048
#define TSDB_MAX_TAGS 128
#define TSDB_AUTH_LEN 16
#define TSDB_KEY_LEN 16
......@@ -133,7 +133,7 @@ extern "C" {
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
#define TSDB_PAYLOAD_SIZE (TSDB_DEFAULT_PKT_SIZE - 100)
#define TSDB_DEFAULT_PAYLOAD_SIZE 1024 // default payload size
#define TSDB_DEFAULT_PAYLOAD_SIZE 4096 // default payload size
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_SQLCMD_SIZE 1024
#define TSDB_MAX_VNODES 256
......
......@@ -167,6 +167,11 @@ typedef struct SExtTagsInfo {
struct SQLFunctionCtx **pTagCtxList;
} SExtTagsInfo;
typedef struct SBoundaryData {
TSKEY key;
double data;
} SBoundaryData;
// sql function runtime context
typedef struct SQLFunctionCtx {
int32_t startOffset;
......@@ -195,6 +200,8 @@ typedef struct SQLFunctionCtx {
SResultInfo *resultInfo;
SExtTagsInfo tagInfo;
SBoundaryData prev; // this value may be less or equalled to the start time of time window
SBoundaryData next; // this value may be greater or equalled to the end time of time window
} SQLFunctionCtx;
typedef struct SQLAggFuncElem {
......
......@@ -130,6 +130,7 @@ extern "C" {
#define POW2(x) ((x) * (x))
size_t twcslen(const wchar_t *wcs);
int32_t strdequote(char *src);
void strtrim(char *src);
......
......@@ -19,7 +19,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ENDIF ()
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME ${DB_CLIENT_NAME})
ELSEIF (TD_WINDOWS_64)
ELSEIF (TD_WINDOWS_64 OR TD_WINDOWS_32)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/regex)
LIST(APPEND SRC ./src/shellEngine.c)
......
......@@ -818,7 +818,8 @@ void shellGetGrantInfo(void *con) {
if (code == TSDB_CODE_OPS_NOT_SUPPORT) {
fprintf(stdout, "Server is Community Edition, version is %s\n\n", taos_get_server_info(con));
} else {
fprintf(stderr, "Failed to check Server Edition, Reason:%d:%s\n\n", taos_errno(con), taos_errstr(con));
//fprintf(stderr, "Failed to check Server Edition, Reason:%d:%s\n\n", taos_errno(con), taos_errstr(con));
fprintf(stdout, "Server is Enterprise Edition, version is %s\n\n", taos_get_server_info(con));
}
return;
}
......
......@@ -141,6 +141,7 @@ static void shellSourceFile(TAOS *con, char *fptr) {
if (wordexp(fptr, &full_path, 0) != 0) {
fprintf(stderr, "ERROR: illegal file name\n");
free(cmd);
return;
}
......@@ -166,6 +167,7 @@ static void shellSourceFile(TAOS *con, char *fptr) {
if (f == NULL) {
fprintf(stderr, "ERROR: failed to open file %s\n", fname);
wordfree(&full_path);
free(cmd);
return;
}
......
......@@ -27,6 +27,7 @@
#include <unistd.h>
#include <wordexp.h>
#include <iconv.h>
#include <time.h>
#include "taos.h"
#include "taosmsg.h"
......@@ -162,6 +163,7 @@ static struct argp_option options[] = {
{"password", 'p', "PASSWORD", 0, "User password to connect to server. Default is "DB_COMPANY".", 0},
{"port", 'P', "PORT", 0, "Port to connect", 0},
{"cversion", 'v', "CVERION", 0, "client version", 0},
{"mysqlFlag", 'q', "MYSQLFLAG", 0, "mysqlFlag, Default is 0", 0},
// input/output file
{"outpath", 'o', "OUTPATH", 0, "Output file path.", 1},
{"inpath", 'i', "INPATH", 0, "Input file path.", 1},
......@@ -189,6 +191,7 @@ struct arguments {
char *password;
uint16_t port;
char cversion[TSDB_FILENAME_LEN+1];
uint16_t mysqlFlag;
// output file
char outpath[TSDB_FILENAME_LEN+1];
char inpath[TSDB_FILENAME_LEN+1];
......@@ -236,6 +239,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'P':
arguments->port = atoi(arg);
break;
case 'q':
arguments->mysqlFlag = atoi(arg);
break;
case 'v':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid client vesion %s\n", arg);
......@@ -325,6 +331,7 @@ int taosDumpOut(struct arguments *arguments);
int taosDumpIn(struct arguments *arguments);
void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp);
int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *taosCon);
int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon);
void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, FILE *fp);
void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, FILE *fp);
int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp, TAOS* taosCon);
......@@ -340,6 +347,7 @@ struct arguments tsArguments = {
DB_COMPANY,
0,
"",
0,
// outpath and inpath
"",
"",
......@@ -384,6 +392,7 @@ int main(int argc, char *argv[]) {
printf("password: %s\n", tsArguments.password);
printf("port: %u\n", tsArguments.port);
printf("cversion: %s\n", tsArguments.cversion);
printf("mysqlFlag: %d", tsArguments.mysqlFlag);
printf("outpath: %s\n", tsArguments.outpath);
printf("inpath: %s\n", tsArguments.inpath);
printf("encode: %s\n", tsArguments.encode);
......@@ -616,7 +625,7 @@ int taosDumpOut(struct arguments *arguments) {
int32_t count = 0;
STableRecordInfo tableRecordInfo;
char tmpBuf[TSDB_FILENAME_LEN+1] = {0};
char tmpBuf[TSDB_FILENAME_LEN+9] = {0};
if (arguments->outpath[0] != 0) {
sprintf(tmpBuf, "%s/dbs.sql", arguments->outpath);
} else {
......@@ -917,6 +926,8 @@ void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
dbInfo->ablocks, dbInfo->tblocks, dbInfo->ctime, dbInfo->clog, dbInfo->comp);
}
pstr += sprintf(pstr, ";");
fprintf(fp, "%s\n\n", tmpCommand);
free(tmpCommand);
}
......@@ -927,7 +938,7 @@ void* taosDumpOutWorkThreadFp(void *arg)
STableRecord tableRecord;
int fd;
char tmpFileName[TSDB_FILENAME_LEN + 1] = {0};
char tmpFileName[TSDB_FILENAME_LEN + 128] = {0};
sprintf(tmpFileName, ".tables.tmp.%d", pThread->threadIndex);
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
......@@ -936,7 +947,7 @@ void* taosDumpOutWorkThreadFp(void *arg)
}
FILE *fp = NULL;
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
memset(tmpFileName, 0, TSDB_FILENAME_LEN + 128);
if (tsArguments.outpath[0] != 0) {
sprintf(tmpFileName, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex);
......@@ -1235,7 +1246,7 @@ void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, FILE *fp) {
}
}
pstr += sprintf(pstr, ")");
pstr += sprintf(pstr, ");");
fprintf(fp, "%s\n", tmpBuf);
......@@ -1288,7 +1299,7 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols
/* } */
}
pstr += sprintf(pstr, ")");
pstr += sprintf(pstr, ");");
fprintf(fp, "%s\n", tmpBuf);
free(tmpBuf);
......@@ -1358,15 +1369,32 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
return -1;
}
char sqlStr[8] = "\0";
if (arguments->mysqlFlag) {
sprintf(sqlStr, "INSERT");
} else {
sprintf(sqlStr, "IMPORT");
}
int rowFlag = 0;
count = 0;
while ((row = taos_fetch_row(tmpResult)) != NULL) {
pstr = tmpBuffer;
if (count == 0) {
pstr += sprintf(pstr, "IMPORT INTO %s VALUES (", tbname);
pstr += sprintf(pstr, "%s INTO %s VALUES (", sqlStr, tbname);
} else {
if (arguments->mysqlFlag) {
if (0 == rowFlag) {
pstr += sprintf(pstr, "(");
rowFlag++;
} else {
pstr += sprintf(pstr, ", (");
}
} else {
pstr += sprintf(pstr, "(");
}
}
for (int col = 0; col < numFields; col++) {
if (col != 0) pstr += sprintf(pstr, ", ");
......@@ -1409,12 +1437,22 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
pstr += sprintf(pstr, "\'%s\'", tbuf);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (!arguments->mysqlFlag) {
pstr += sprintf(pstr, "%" PRId64 "", *(int64_t *)row[col]);
} else {
char buf[64] = "\0";
int64_t ts = *((int64_t *)row[col]);
time_t tt = (time_t)(ts / 1000);
struct tm *ptm = localtime(&tt);
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
pstr += sprintf(pstr, "\'%s.%03d\'", buf, (int)(ts % 1000));
}
break;
default:
break;
}
}
pstr += sprintf(pstr, ") ");
totalRows++;
......@@ -1422,7 +1460,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
fprintf(fp, "%s", tmpBuffer);
if (count >= arguments->data_batch) {
fprintf(fp, "\n");
fprintf(fp, ";\n");
count = 0;
} //else {
//fprintf(fp, "\\\n");
......
......@@ -152,9 +152,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (row[i]!= NULL){
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name);
memcpy(target + len, (char *) row[i], fields[i].bytes);
len = strlen(target);
}
break;
default:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%s", fields[i].name, "-");
......
......@@ -75,6 +75,8 @@ bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len) {
unsigned char *base64 = base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, ip:%s, taosd token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token);
if (base64)
free(base64);
return false;
}
if (outlen != (TSDB_USER_LEN + TSDB_PASSWORD_LEN)) {
......
......@@ -68,9 +68,7 @@ bool restProcessSqlRequest(HttpContext* pContext, int timestampFmt) {
}
/*
* for async test
* /
// for async test
/*
if (httpCheckUsedbSql(sql)) {
httpSendErrorResp(pContext, HTTP_NO_EXEC_USEDB);
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_PLATFORM_LINUX_H
#define TDENGINE_PLATFORM_LINUX_H
#ifndef TDENGINE_PLATFORM_DARWIN_H
#define TDENGINE_PLATFORM_DARWIN_H
#ifdef __cplusplus
extern "C" {
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
IF (TD_WINDOWS_64)
IF (TD_WINDOWS_64 OR TD_WINDOWS_32)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc)
......
......@@ -142,8 +142,11 @@ extern "C" {
#define atomic_exchange_16(ptr, val) _InterlockedExchange16((short volatile*)(ptr), (short)(val))
#define atomic_exchange_32(ptr, val) _InterlockedExchange((long volatile*)(ptr), (long)(val))
#define atomic_exchange_64(ptr, val) _InterlockedExchange64((__int64 volatile*)(ptr), (__int64)(val))
#define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val))
#if (_MSC_VER == 1800)
#define atomic_exchange_ptr(ptr, val) InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val))
#else
#define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val))
#endif
#ifdef _TD_GO_DLL_
#define atomic_val_compare_exchange_8 __sync_val_compare_and_swap
#else
......
......@@ -93,7 +93,11 @@ long interlocked_add_fetch_32(long volatile* ptr, long val) {
}
__int64 interlocked_add_fetch_64(__int64 volatile* ptr, __int64 val) {
#ifdef _WIN64
return _InterlockedExchangeAdd64(ptr, val) + val;
#else
return _InterlockedExchangeAdd(ptr, val) + val;
#endif
}
// and
......@@ -377,9 +381,29 @@ int fsendfile(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count) {
return writeLen;
}
unsigned char _MyBitScanForward64(unsigned long *ret, uint64_t x) {
unsigned long x0 = (unsigned long)x, top, bottom;
_BitScanForward(&top, (unsigned long)(x >> 32));
_BitScanForward(&bottom, x0);
*ret = x0 ? bottom : 32 + top;
return x != 0;
}
unsigned char _MyBitScanReverse64(unsigned long *ret, uint64_t x) {
unsigned long x1 = (unsigned long)(x >> 32), top, bottom;
_BitScanReverse(&top, x1);
_BitScanReverse(&bottom, (unsigned long)x);
*ret = x1 ? top + 32 : bottom;
return x != 0;
}
int32_t BUILDIN_CLZL(uint64_t val) {
unsigned long r = 0;
#ifdef _WIN64
_BitScanReverse64(&r, val);
#else
_MyBitScanReverse64(&r, val);
#endif
return (int)(r >> 3);
}
......@@ -391,7 +415,11 @@ int32_t BUILDIN_CLZ(uint32_t val) {
int32_t BUILDIN_CTZL(uint64_t val) {
unsigned long r = 0;
#ifdef _WIN64
_BitScanForward64(&r, val);
#else
_MyBitScanForward64(&r, val);
#endif
return (int)(r >> 3);
}
......
......@@ -12,7 +12,7 @@ ELSEIF (TD_DARWIN_64)
LIST(APPEND SRC ./src/trpc.c)
LIST(APPEND SRC ./src/tstring.c)
LIST(APPEND SRC ./src/tudp.c)
ELSEIF (TD_WINDOWS_64)
ELSEIF (TD_WINDOWS_64 OR TD_WINDOWS_32)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
LIST(APPEND SRC ./src/thaship.c)
LIST(APPEND SRC ./src/trpc.c)
......
......@@ -1184,9 +1184,9 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
taosTmrReset(taosProcessIdleTimer, pServer->idleTime, pConn, pChann->tmrCtrl, &pConn->pIdleTimer);
}
if (code == TSDB_CODE_ALREADY_PROCESSED) {
tTrace("%s cid:%d sid:%d id:%s, %s wont be processed, source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label,
chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->sourceId, htonl(pHeader->destId),
if (code == TSDB_CODE_ALREADY_PROCESSED || code == TSDB_CODE_LAST_SESSION_NOT_FINISHED) {
tTrace("%s code:%d, cid:%d sid:%d id:%s, %s wont be processed, source:0x%08x dest:0x%08x tranId:%d pConn:%p", pServer->label,
code, chann, sid, pHeader->meterId, taosMsg[pHeader->msgType], pHeader->sourceId, htonl(pHeader->destId),
pHeader->tranId, pConn);
free(data);
return pConn;
......
......@@ -206,17 +206,20 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno));
free(pTcp);
return NULL;
}
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
free(pTcp);
return NULL;
}
pTcp->pollFd = epoll_create(10); // size does not matter
if (pTcp->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
free(pTcp);
return NULL;
}
......@@ -226,6 +229,7 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)) != 0) {
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno));
free(pTcp);
return NULL;
}
......
......@@ -389,6 +389,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads);
if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label);
free(pServerObj);
return NULL;
}
memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads);
......@@ -401,17 +402,23 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) {
tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno));
free(pServerObj->pThreadObj);
free(pServerObj);
return NULL;
}
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
free(pServerObj->pThreadObj);
free(pServerObj);
return NULL;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label);
free(pServerObj->pThreadObj);
free(pServerObj);
return NULL;
}
......@@ -419,6 +426,8 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) {
tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno));
free(pServerObj->pThreadObj);
free(pServerObj);
return NULL;
}
......@@ -430,6 +439,8 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) {
tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno));
free(pServerObj->pThreadObj);
free(pServerObj);
return NULL;
}
......
......@@ -127,7 +127,7 @@ typedef struct {
} SMnodeStatus;
typedef struct {
uint8_t dbId;
char dbId;
char type;
uint64_t version;
short dataLen;
......
......@@ -289,7 +289,7 @@ sdb_exit1:
return -1;
}
void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory,
void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, char keyType, char *directory,
void *(*appTool)(char, void *, char *, int, int *)) {
SSdbTable *pTable = (SSdbTable *)malloc(sizeof(SSdbTable));
if (pTable == NULL) return NULL;
......@@ -310,7 +310,7 @@ void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType,
pTable->appTool = appTool;
sprintf(pTable->fn, "%s/%s.db", directory, pTable->name);
if (sdbInitIndexFp[keyType] != NULL) pTable->iHandle = (*sdbInitIndexFp[keyType])(maxRows, sizeof(SRowMeta));
if (sdbInitIndexFp[(int)keyType] != NULL) pTable->iHandle = (*sdbInitIndexFp[(int)keyType])(maxRows, sizeof(SRowMeta));
pthread_mutex_init(&pTable->mutex, NULL);
......
......@@ -176,9 +176,9 @@ typedef struct _user_obj {
char writeAuth : 1;
char reserved[16];
char updateEnd[1];
struct _user_obj *prev, *next;
int32_t authAllowTime;
int32_t authFailCount;
struct _user_obj *prev, *next;
} SUserObj;
typedef struct {
......@@ -432,6 +432,8 @@ bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode);
void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
int mgmtUnSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
void mgmtGetDnodeOnlineNum(int32_t *totalDnodes, int32_t *onlineDnodes);
extern int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
extern int (*mgmtRetrieveFp[])(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
......
......@@ -261,7 +261,7 @@ typedef struct SQuery {
TSKEY ekey;
int64_t intervalTime;
int64_t slidingTime; // sliding time for sliding window query
char intervalTimeUnit; // interval data type, used for daytime revise
char slidingTimeUnit; // interval data type, used for daytime revise
int8_t precision;
int16_t numOfOutputCols;
int16_t interpoType;
......
......@@ -85,12 +85,6 @@ typedef enum {
QUERY_NO_DATA_TO_CHECK = 0x8u,
} vnodeQueryStatus;
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
char** pPrevPoint;
char** pNextPoint;
} SPointInterpoSupporter;
typedef struct SBlockInfo {
TSKEY keyFirst;
TSKEY keyLast;
......@@ -285,6 +279,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
SWindowResult* getWindowRes(SWindowResInfo* pWindowResInfo, size_t index);
#ifdef __cplusplus
}
......
......@@ -141,6 +141,12 @@ typedef struct SWindowResInfo {
int64_t threshold; // threshold for return completed results.
} SWindowResInfo;
typedef struct SPointInterpoSupporter {
int32_t numOfCols;
char** pPrevPoint;
char** pNextPoint;
} SPointInterpoSupporter;
typedef struct SQueryRuntimeEnv {
SPositionInfo startPos; /* the start position, used for secondary/third iteration */
SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */
......@@ -172,6 +178,10 @@ typedef struct SQueryRuntimeEnv {
bool stableQuery; // is super table query or not
SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool hasTimeWindow;
char** lastRowInBlock;
bool interpoSearch;
/*
* Temporarily hold the in-memory cache block info during scan cache blocks
* Here we do not use the cache block info from pMeterObj, simple because it may change anytime
......
......@@ -567,6 +567,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
pMetric = mgmtGetMeter(pTagData);
if (pMetric == NULL) {
mError("table:%s, corresponding super table does not exist", pCreate->meterId);
free(pMeter);
return TSDB_CODE_INVALID_TABLE;
}
......
......@@ -1203,6 +1203,10 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) {
pConn->streamId = 0;
pHBRsp->killConnection = pConn->killConnection;
mgmtGetDnodeOnlineNum(&pHBRsp->totalDnodes, &pHBRsp->onlineDnodes);
pHBRsp->totalDnodes = htonl(pHBRsp->totalDnodes);
pHBRsp->onlineDnodes = htonl(pHBRsp->onlineDnodes);
if (pConn->usePublicIp) {
if (pSdbPublicIpList != NULL) {
int size = pSdbPublicIpList->numOfIps * 4;
......@@ -1350,9 +1354,9 @@ _rsp:
pMsg += sizeof(STaosRsp);
pConnectRsp = (SConnectRsp *)pRsp->more;
if (NULL != pConn->pAcct) {
if (code == 0) {
sprintf(pConnectRsp->acctId, "%x", pConn->pAcct->acctId);
}
strcpy(pConnectRsp->version, version);
pConnectRsp->writeAuth = pConn->writeAuth;
pConnectRsp->superAuth = pConn->superAuth;
......@@ -1378,8 +1382,7 @@ _rsp:
// set the time resolution: millisecond or microsecond
*((uint32_t *)pMsg) = tsTimePrecision;
pMsg += sizeof(uint32_t);
if (code != 0) {
} else {
pConnectRsp->writeAuth = 0;
pConnectRsp->superAuth = 0;
pConn->pAcct = NULL;
......
......@@ -577,7 +577,7 @@ static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
const wchar_t* pattern = pRight;
const wchar_t* str = pLeft;
int32_t ret = WCSPatternMatch(pattern, str, wcslen(str), &pInfo);
int32_t ret = WCSPatternMatch(pattern, str, twcslen(str), &pInfo);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
}
......@@ -711,7 +711,6 @@ static int32_t mgmtFilterMeterByIndex(STabObj* pMetric, tQueryResultset* pRes, c
// failed to build expression, no result, return immediately
if (pExpr == NULL) {
mError("metric:%s, no result returned, error in super table query expression:%s", pMetric->meterId, pCond);
tfree(pCond);
return TSDB_CODE_OPS_NOT_SUPPORT;
} else { // query according to the binary expression
......
......@@ -184,6 +184,12 @@ int mgmtGetUserMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_USER_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "account");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
......@@ -230,6 +236,10 @@ int mgmtRetrieveUsers(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
*(int64_t *)pWrite = pUser->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pUser->acct);
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
......
......@@ -413,7 +413,7 @@ void vnodeRemoveFile(int vnode, int fileId) {
vnodeGetDnameFromLname(headName, dataName, lastName, dHeadName, dDataName, dLastName);
int fd = open(headName, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (fd > 0) {
if (fd >= 0) {
vnodeGetHeadFileHeaderInfo(fd, &headInfo);
atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), -headInfo.totalStorage);
close(fd);
......
......@@ -269,7 +269,7 @@ static SQInfo *vnodeAllocateQInfoEx(SQueryMeterMsg *pQueryMsg, SSqlGroupbyExpr *
pQuery->intervalTime = pQueryMsg->intervalTime;
pQuery->slidingTime = pQueryMsg->slidingTime;
pQuery->interpoType = pQueryMsg->interpoType;
pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit;
pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
pQInfo->query.pointsToRead = vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock;
......@@ -496,6 +496,9 @@ void vnodeDecRefCount(void *param) {
assert(vnodeIsQInfoValid(pQInfo));
int32_t ref = atomic_sub_fetch_32(&pQInfo->refCount, 1);
if (ref < 0) {
return; // avoid two threads dec ref count
}
assert(ref >= 0);
dTrace("QInfo:%p decrease obj refcount, %d", pQInfo, ref);
......@@ -649,18 +652,28 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
}
STableQuerySupportObj *pSupporter = (STableQuerySupportObj *)calloc(1, sizeof(STableQuerySupportObj));
if (pSupporter == NULL) {
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error;
}
pSupporter->numOfMeters = 1;
pSupporter->pMetersHashTable = taosInitHashTable(pSupporter->numOfMeters, taosIntHash_32, false);
taosAddToHashTable(pSupporter->pMetersHashTable, (const char*) &pMetersObj[0]->sid, sizeof(pMeterObj[0].sid),
(char *)&pMetersObj[0], POINTER_BYTES);
pSupporter->pSidSet = NULL;
pSupporter->subgroupIdx = -1;
pSupporter->pMeterSidExtInfo = NULL;
pQInfo->pTableQuerySupporter = pSupporter;
pSupporter->pMetersHashTable = taosInitHashTable(pSupporter->numOfMeters, taosIntHash_32, false);
if (pSupporter->pMetersHashTable == NULL) {
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error;
}
if (taosAddToHashTable(pSupporter->pMetersHashTable, (const char*) &pMetersObj[0]->sid, sizeof(pMeterObj[0].sid),
(char *)&pMetersObj[0], POINTER_BYTES) != 0) {
*code = TSDB_CODE_APP_ERROR;
goto _error;
}
STSBuf *pTSBuf = NULL;
if (pQueryMsg->tsLen > 0) {
// open new file to save the result
......
......@@ -24,6 +24,8 @@
#include "vnodeUtil.h"
#include "vnodeStatus.h"
#include <dirent.h>
int tsMaxVnode = -1;
int tsOpenVnodes = 0;
SVnodeObj *vnodeList = NULL;
......
......@@ -67,6 +67,7 @@ int taosSendMsgToDnode(SDnodeObj *pObj, char *msg, int msgLen) {
* Lite version has no message header, so minus one
*/
SSchedMsg schedMsg;
schedMsg.tfp = NULL;
schedMsg.fp = vnodeProcessMsgFromMgmtSpec;
schedMsg.msg = msg - 1;
schedMsg.ahandle = NULL;
......
......@@ -46,3 +46,8 @@ int mgmtProcessDropAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) {
int mgmtProcessCreateAcctMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
}
void mgmtGetDnodeOnlineNum(int32_t *totalDnodes, int32_t *onlineDnodes) {
*totalDnodes = 1;
*onlineDnodes = 1;
}
\ No newline at end of file
......@@ -28,7 +28,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
MESSAGE(STATUS "Failed to find iconv, use default encoding method")
ENDIF ()
ENDIF ()
ELSEIF (TD_WINDOWS_64)
ELSEIF (TD_WINDOWS_64 OR TD_WINDOWS_32)
ADD_DEFINITIONS(-DUSE_LIBICONV)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/iconv)
......
......@@ -587,6 +587,8 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
"size:%" PRId64 " bytes, collision:%d",
pNode->key, pNode, HASH_INDEX(pNode->hashVal, pObj->capacity), pNode->addTime, pNode->time, pObj->size,
pObj->totalSize, pObj->statistics.numOfCollision);
} else {
pError("key:%s failed to added into cache, out of memory", key);
}
} else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pObj, pOldNode, key, keyLen, pData, dataSize, keepTime * 1000L);
......
......@@ -769,7 +769,7 @@ int tsDecompressTimestampImp(const char *const input, const int nelements, char
delta_of_delta = 0;
} else {
if (is_bigendian()) {
memcpy(&dd1 + LONG_BYTES - nbytes, input + ipos, nbytes);
memcpy(((char *)(&dd1)) + LONG_BYTES - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd1, input + ipos, nbytes);
}
......@@ -794,7 +794,7 @@ int tsDecompressTimestampImp(const char *const input, const int nelements, char
delta_of_delta = 0;
} else {
if (is_bigendian()) {
memcpy(&dd2 + LONG_BYTES - nbytes, input + ipos, nbytes);
memcpy(((char *)(&dd2)) + LONG_BYTES - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd2, input + ipos, nbytes);
}
......
......@@ -41,11 +41,13 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
char *tmpDir = "/tmp/";
#endif
int64_t ts = taosGetTimestampUs();
strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%llu-%u");
snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1));
strcat(tmpPath, "-%d-%llu-%u-%llu");
snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, getpid(), taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1), ts);
}
/*
......@@ -80,7 +82,7 @@ tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, SColumnMo
return pMemBuffer;
}
void* destoryExtMemBuffer(tExtMemBuffer *pMemBuffer) {
void* destroyExtMemBuffer(tExtMemBuffer *pMemBuffer) {
if (pMemBuffer == NULL) {
return NULL;
}
......@@ -914,6 +916,7 @@ void tColModelDisplay(SColumnModel *pModel, void *pData, int32_t numOfRows, int3
char buf[4096] = {0};
taosUcs4ToMbs(val, pModel->pFields[j].field.bytes, buf);
printf("%s\t", buf);
break;
}
case TSDB_DATA_TYPE_BINARY: {
printBinaryData(val, pModel->pFields[j].field.bytes);
......@@ -965,6 +968,7 @@ void tColModelDisplayEx(SColumnModel *pModel, void *pData, int32_t numOfRows, in
char buf[128] = {0};
taosUcs4ToMbs(val, pModel->pFields[j].field.bytes, buf);
printf("%s\t", buf);
break;
}
case TSDB_DATA_TYPE_BINARY: {
printBinaryDataEx(val, pModel->pFields[j].field.bytes, &param[j]);
......
......@@ -156,11 +156,11 @@ char tsSocketType[4] = "udp";
// time precision, millisecond by default
int tsTimePrecision = TSDB_TIME_PRECISION_MILLI;
// 10 ms for sliding time, the value will changed in case of time precision changed
int tsMinSlidingTime = 10;
// 1 us for sliding time, the value will changed in case of time precision changed
int tsMinSlidingTime = 1;
// 10 ms for interval time range, changed accordingly
int tsMinIntervalTime = 10;
// 1 us for interval time range, changed accordingly
int tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int tsMaxStreamComputDelay = 20000;
......@@ -632,10 +632,10 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "minSlidingTime", &tsMinSlidingTime, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
10, 1000000, 0, TSDB_CFG_UTYPE_MS);
1, 1000000000, 0, TSDB_CFG_UTYPE_MS);
tsInitConfigOption(cfg++, "minIntervalTime", &tsMinIntervalTime, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
10, 1000000, 0, TSDB_CFG_UTYPE_MS);
1, 1000000000, 0, TSDB_CFG_UTYPE_MS);
tsInitConfigOption(cfg++, "maxStreamCompDelay", &tsMaxStreamComputDelay, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
10, 1000000000, 0, TSDB_CFG_UTYPE_MS);
......
......@@ -22,12 +22,12 @@
#define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC)
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision) {
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char slidingTimeUnit, int16_t precision) {
if (timeRange == 0) {
return startTime;
}
if (intervalTimeUnit == 'a' || intervalTimeUnit == 'm' || intervalTimeUnit == 's' || intervalTimeUnit == 'h') {
if (slidingTimeUnit == 'a' || slidingTimeUnit == 'm' || slidingTimeUnit == 's' || slidingTimeUnit == 'h' || slidingTimeUnit == 'u') {
return (startTime / timeRange) * timeRange;
} else {
/*
......@@ -95,11 +95,11 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD
pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows;
}
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision) {
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t slidingTimeUnit, int8_t precision) {
if (order == TSQL_SO_ASC) {
return ekey;
} else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, intervalTimeUnit, precision);
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
}
}
......@@ -191,6 +191,49 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
return 0;
}
int taosDoLinearInterpolationD(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
switch (type) {
case TSDB_DATA_TYPE_INT: {
*(double*) point->val = doLinearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key,
point2->key, point->key);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(double*)point->val =
doLinearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_DOUBLE: {
*(double*)point->val =
doLinearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
*(double*)point->val = doLinearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key,
point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_SMALLINT: {
*(double*)point->val = doLinearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key,
point2->key, point->key);
break;
};
case TSDB_DATA_TYPE_TINYINT: {
*(double*)point->val =
doLinearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key);
break;
};
default: {
// TODO: Deal with interpolation with bool and strings and timestamp
return -1;
}
}
return 0;
}
static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; }
static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order,
......
......@@ -77,7 +77,7 @@ void taosUnLockNote(int fd, taosNoteInfo * pNote)
void *taosThreadToOpenNewNote(void *param)
{
char name[NOTE_FILE_NAME_LEN];
char name[NOTE_FILE_NAME_LEN + 16];
taosNoteInfo * pNote = (taosNoteInfo *)param;
pNote->taosNoteFlag ^= 1;
......@@ -170,7 +170,7 @@ void taosGetNoteName(char *fn, taosNoteInfo * pNote)
int taosOpenNoteWithMaxLines(char *fn, int maxLines, int maxNoteNum, taosNoteInfo * pNote)
{
char name[NOTE_FILE_NAME_LEN] = "\0";
char name[NOTE_FILE_NAME_LEN + 16] = "\0";
struct stat notestat0, notestat1;
int size;
......
......@@ -32,7 +32,7 @@ tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, i
pBuffer = pSeg->pBuffer[j];
} else {
if (pSeg->pBuffer && pSeg->pBuffer[j]) {
pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]);
pSeg->pBuffer[j] = destroyExtMemBuffer(pSeg->pBuffer[j]);
}
}
}
......@@ -338,7 +338,7 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
for (int32_t j = 0; j < pSeg->numOfSlots; ++j) {
if (pSeg->pBuffer[j] != NULL) {
pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]);
pSeg->pBuffer[j] = destroyExtMemBuffer(pSeg->pBuffer[j]);
}
}
tfree(pSeg->pBuffer);
......@@ -588,7 +588,7 @@ void releaseBucket(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) {
return;
}
pSeg->pBuffer[slotIdx] = destoryExtMemBuffer(pSeg->pBuffer[slotIdx]);
pSeg->pBuffer[slotIdx] = destroyExtMemBuffer(pSeg->pBuffer[slotIdx]);
}
////////////////////////////////////////////////////////////////////////////////////////////
......@@ -853,7 +853,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
tMemBucketSegment *pSeg = &pMemBucket->pSegs[tt];
for (int32_t ttx = 0; ttx < pSeg->numOfSlots; ++ttx) {
if (pSeg->pBuffer && pSeg->pBuffer[ttx]) {
pSeg->pBuffer[ttx] = destoryExtMemBuffer(pSeg->pBuffer[ttx]);
pSeg->pBuffer[ttx] = destroyExtMemBuffer(pSeg->pBuffer[ttx]);
}
}
}
......
......@@ -392,6 +392,8 @@ static int32_t getTimestampInUsFromStrImpl(int64_t val, char unit, int64_t* resu
break;
case 'a':
break;
case 'u':
return 0;
default: {
;
return -1;
......
......@@ -510,7 +510,7 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
/* here is the 1a/2s/3m/9y */
if ((z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' || z[i] == 'y' ||
z[i] == 'w' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' ||
z[i] == 'Y' || z[i] == 'W') &&
z[i] == 'Y' || z[i] == 'W' || z[i] == 'u' || z[i] == 'U') &&
(isIdChar[(uint8_t)z[i + 1]] == 0)) {
*tokenType = TK_VARIABLE;
i += 1;
......
......@@ -197,7 +197,7 @@ int32_t tVariantToString(tVariant *pVar, char *dst) {
case TSDB_DATA_TYPE_NCHAR: {
dst[0] = '\'';
taosUcs4ToMbs(pVar->wpz, (wcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1);
taosUcs4ToMbs(pVar->wpz, (twcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1);
int32_t len = strlen(dst);
dst[len] = '\'';
dst[len + 1] = 0;
......@@ -430,7 +430,7 @@ static int32_t toNchar(tVariant *pVariant, char **pDest, int32_t *pDestSize) {
}
pVariant->wpz = pWStr;
*pDestSize = wcslen(pVariant->wpz);
*pDestSize = twcslen(pVariant->wpz);
// shrink the allocate memory, no need to check here.
char* tmp = realloc(pVariant->wpz, (*pDestSize + 1)*TSDB_NCHAR_SIZE);
......
......@@ -27,6 +27,23 @@
#include "tlog.h"
#include "taoserror.h"
size_t twcslen(const wchar_t *wcs) {
int *wstr = (int *)wcs;
if (NULL == wstr) {
return 0;
}
size_t n = 0;
while (1) {
if (0 == *wstr++) {
break;
}
n++;
}
return n;
}
int32_t strdequote(char *z) {
if (z == NULL) {
return 0;
......
char version[64] = "1.6.6.1";
char compatible_version[64] = "1.6.0.0";
char gitinfo[128] = "feea817446e25ff1ef77cefaeeda08f45564d6bc";
char gitinfoOfInternal[128] = "5c736133836e7a9f757216539986ddf746439a11";
char buildinfo[512] = "Built by root at 2020-03-13 16:27";
char version[64] = "2.0.2.3";
char compatible_version[64] = "2.0.0.0";
char gitinfo[128] = "615410abe06a3a5354385760ce9f46fb954a67c5";
char gitinfoOfInternal[128] = "be258cbbe8a9bc3a3da785d451479e46e9c8d03e";
char buildinfo[512] = "Built by root at 2020-05-27 14:12";
void libtaos_1_6_6_1_Linux_x64() {};
void libtaos_2_0_2_3_Linux_x64() {};
......@@ -108,7 +108,6 @@ public class DataGenerator {
}
private static void getDataInOneFile(String path, int rowsPerDevice, int num, int humidityDistRadius, int tempDistRadius) throws IOException {
DecimalFormat df = new DecimalFormat("0.0000");
long startTime = dataStartTime;
FileWriter fw = new FileWriter(new File(path));
......@@ -135,13 +134,13 @@ public class DataGenerator {
for (int j = 0; j < rowsPerDevice; ++j) {
int humidity = (int) humidityDataGen.next();
double temp = tempDataGen.next();
int temp = (int) tempDataGen.next();
int deviceGroup = deviceId % 100;
StringBuffer sb = new StringBuffer();
sb.append(deviceId).append(" ").append(tagPrefix).append(deviceId).append(" ").append(deviceGroup)
.append(" ").append(dataStartTime).append(" ").append(humidity).append(" ")
.append(df.format(temp));
.append(temp);
bw.write(sb.toString());
bw.write("\n");
......
PROJECT(TDengine)
IF (TD_WINDOWS_64)
IF (TD_WINDOWS_64 OR TD_WINDOWS_32)
INCLUDE_DIRECTORIES(${TD_ROOT_DIR}/deps/pthread)
ENDIF ()
......
......@@ -19,7 +19,7 @@ import (
"time"
"log"
"fmt"
_ "taosSql"
_ "github.com/taosdata/driver-go/taosSql"
)
func main() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册