提交 480a71be 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/TD-1413

...@@ -172,6 +172,7 @@ function install_bin() { ...@@ -172,6 +172,7 @@ function install_bin() {
${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taosd || : ${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || : ${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/rmtaos || :
${csudo} rm -f ${bin_link_dir}/tarbitrator || : ${csudo} rm -f ${bin_link_dir}/tarbitrator || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
...@@ -182,6 +183,7 @@ function install_bin() { ...@@ -182,6 +183,7 @@ function install_bin() {
[ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || : [ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || :
[ -x ${install_main_dir}/bin/taosd ] && ${csudo} ln -s ${install_main_dir}/bin/taosd ${bin_link_dir}/taosd || : [ -x ${install_main_dir}/bin/taosd ] && ${csudo} ln -s ${install_main_dir}/bin/taosd ${bin_link_dir}/taosd || :
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || : [ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo} ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/rmtaos || : [ -x ${install_main_dir}/bin/remove.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/rmtaos || :
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
[ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo} ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || : [ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo} ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || :
......
...@@ -84,8 +84,9 @@ function install_main_path() { ...@@ -84,8 +84,9 @@ function install_main_path() {
function install_bin() { function install_bin() {
# Remove links # Remove links
${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taos || :
if [ "$osType" == "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${bin_link_dir}/taosdemo || : ${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
fi fi
${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/rmtaos || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
...@@ -94,8 +95,9 @@ function install_bin() { ...@@ -94,8 +95,9 @@ function install_bin() {
#Make link #Make link
[ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || : [ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || :
if [ "$osType" == "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || : [ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo} ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
fi fi
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || : [ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || :
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
......
...@@ -84,8 +84,9 @@ function install_main_path() { ...@@ -84,8 +84,9 @@ function install_main_path() {
function install_bin() { function install_bin() {
# Remove links # Remove links
${csudo} rm -f ${bin_link_dir}/power || : ${csudo} rm -f ${bin_link_dir}/power || :
if [ "$osType" == "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${bin_link_dir}/powerdemo || : ${csudo} rm -f ${bin_link_dir}/powerdemo || :
${csudo} rm -f ${bin_link_dir}/powerdump || :
fi fi
${csudo} rm -f ${bin_link_dir}/rmpower || : ${csudo} rm -f ${bin_link_dir}/rmpower || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
...@@ -94,8 +95,9 @@ function install_bin() { ...@@ -94,8 +95,9 @@ function install_bin() {
#Make link #Make link
[ -x ${install_main_dir}/bin/power ] && ${csudo} ln -s ${install_main_dir}/bin/power ${bin_link_dir}/power || : [ -x ${install_main_dir}/bin/power ] && ${csudo} ln -s ${install_main_dir}/bin/power ${bin_link_dir}/power || :
if [ "$osType" == "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/powerdemo ] && ${csudo} ln -s ${install_main_dir}/bin/powerdemo ${bin_link_dir}/powerdemo || : [ -x ${install_main_dir}/bin/powerdemo ] && ${csudo} ln -s ${install_main_dir}/bin/powerdemo ${bin_link_dir}/powerdemo || :
[ -x ${install_main_dir}/bin/powerdump ] && ${csudo} ln -s ${install_main_dir}/bin/powerdump ${bin_link_dir}/powerdump || :
fi fi
[ -x ${install_main_dir}/bin/remove_client_power.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client_power.sh ${bin_link_dir}/rmpower || : [ -x ${install_main_dir}/bin/remove_client_power.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client_power.sh ${bin_link_dir}/rmpower || :
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
......
...@@ -172,6 +172,7 @@ function install_bin() { ...@@ -172,6 +172,7 @@ function install_bin() {
${csudo} rm -f ${bin_link_dir}/power || : ${csudo} rm -f ${bin_link_dir}/power || :
${csudo} rm -f ${bin_link_dir}/powerd || : ${csudo} rm -f ${bin_link_dir}/powerd || :
${csudo} rm -f ${bin_link_dir}/powerdemo || : ${csudo} rm -f ${bin_link_dir}/powerdemo || :
${csudo} rm -f ${bin_link_dir}/powerdump || :
${csudo} rm -f ${bin_link_dir}/rmpower || : ${csudo} rm -f ${bin_link_dir}/rmpower || :
${csudo} rm -f ${bin_link_dir}/tarbitrator || : ${csudo} rm -f ${bin_link_dir}/tarbitrator || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
...@@ -182,6 +183,7 @@ function install_bin() { ...@@ -182,6 +183,7 @@ function install_bin() {
[ -x ${install_main_dir}/bin/power ] && ${csudo} ln -s ${install_main_dir}/bin/power ${bin_link_dir}/power || : [ -x ${install_main_dir}/bin/power ] && ${csudo} ln -s ${install_main_dir}/bin/power ${bin_link_dir}/power || :
[ -x ${install_main_dir}/bin/powerd ] && ${csudo} ln -s ${install_main_dir}/bin/powerd ${bin_link_dir}/powerd || : [ -x ${install_main_dir}/bin/powerd ] && ${csudo} ln -s ${install_main_dir}/bin/powerd ${bin_link_dir}/powerd || :
[ -x ${install_main_dir}/bin/powerdemo ] && ${csudo} ln -s ${install_main_dir}/bin/powerdemo ${bin_link_dir}/powerdemo || : [ -x ${install_main_dir}/bin/powerdemo ] && ${csudo} ln -s ${install_main_dir}/bin/powerdemo ${bin_link_dir}/powerdemo || :
[ -x ${install_main_dir}/bin/powerdump ] && ${csudo} ln -s ${install_main_dir}/bin/powerdump ${bin_link_dir}/powerdump || :
[ -x ${install_main_dir}/bin/remove_power.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_power.sh ${bin_link_dir}/rmpower || : [ -x ${install_main_dir}/bin/remove_power.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_power.sh ${bin_link_dir}/rmpower || :
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
[ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo} ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || : [ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo} ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || :
......
...@@ -92,6 +92,7 @@ function install_bin() { ...@@ -92,6 +92,7 @@ function install_bin() {
${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taosd || : ${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || : ${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/rmtaos || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
...@@ -101,6 +102,7 @@ function install_bin() { ...@@ -101,6 +102,7 @@ function install_bin() {
[ -x ${bin_dir}/taos ] && ${csudo} ln -s ${bin_dir}/taos ${bin_link_dir}/taos || : [ -x ${bin_dir}/taos ] && ${csudo} ln -s ${bin_dir}/taos ${bin_link_dir}/taos || :
[ -x ${bin_dir}/taosd ] && ${csudo} ln -s ${bin_dir}/taosd ${bin_link_dir}/taosd || : [ -x ${bin_dir}/taosd ] && ${csudo} ln -s ${bin_dir}/taosd ${bin_link_dir}/taosd || :
[ -x ${bin_dir}/taosdemo ] && ${csudo} ln -s ${bin_dir}/taosdemo ${bin_link_dir}/taosdemo || : [ -x ${bin_dir}/taosdemo ] && ${csudo} ln -s ${bin_dir}/taosdemo ${bin_link_dir}/taosdemo || :
[ -x ${bin_dir}/taosdump ] && ${csudo} ln -s ${bin_dir}/taosdump ${bin_link_dir}/taosdump || :
[ -x ${bin_dir}/set_core.sh ] && ${csudo} ln -s ${bin_dir}/set_core.sh ${bin_link_dir}/set_core || : [ -x ${bin_dir}/set_core.sh ] && ${csudo} ln -s ${bin_dir}/set_core.sh ${bin_link_dir}/set_core || :
} }
......
...@@ -72,6 +72,7 @@ function clean_bin() { ...@@ -72,6 +72,7 @@ function clean_bin() {
${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taosd || : ${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || : ${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/rmtaos || :
${csudo} rm -f ${bin_link_dir}/tarbitrator || : ${csudo} rm -f ${bin_link_dir}/tarbitrator || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
...@@ -222,4 +223,4 @@ elif echo $osinfo | grep -qwi "centos" ; then ...@@ -222,4 +223,4 @@ elif echo $osinfo | grep -qwi "centos" ; then
fi fi
echo -e "${GREEN}TDengine is removed successfully!${NC}" echo -e "${GREEN}TDengine is removed successfully!${NC}"
echo echo
\ No newline at end of file
...@@ -38,6 +38,7 @@ function clean_bin() { ...@@ -38,6 +38,7 @@ function clean_bin() {
# Remove link # Remove link
${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taosdemo || : ${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/rmtaos || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
} }
......
...@@ -38,6 +38,7 @@ function clean_bin() { ...@@ -38,6 +38,7 @@ function clean_bin() {
# Remove link # Remove link
${csudo} rm -f ${bin_link_dir}/power || : ${csudo} rm -f ${bin_link_dir}/power || :
${csudo} rm -f ${bin_link_dir}/powerdemo || : ${csudo} rm -f ${bin_link_dir}/powerdemo || :
${csudo} rm -f ${bin_link_dir}/powerdump || :
${csudo} rm -f ${bin_link_dir}/rmpower || : ${csudo} rm -f ${bin_link_dir}/rmpower || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
} }
......
...@@ -72,6 +72,7 @@ function clean_bin() { ...@@ -72,6 +72,7 @@ function clean_bin() {
${csudo} rm -f ${bin_link_dir}/power || : ${csudo} rm -f ${bin_link_dir}/power || :
${csudo} rm -f ${bin_link_dir}/powerd || : ${csudo} rm -f ${bin_link_dir}/powerd || :
${csudo} rm -f ${bin_link_dir}/powerdemo || : ${csudo} rm -f ${bin_link_dir}/powerdemo || :
${csudo} rm -f ${bin_link_dir}/powerdump || :
${csudo} rm -f ${bin_link_dir}/rmpower || : ${csudo} rm -f ${bin_link_dir}/rmpower || :
${csudo} rm -f ${bin_link_dir}/tarbitrator || : ${csudo} rm -f ${bin_link_dir}/tarbitrator || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
...@@ -223,4 +224,4 @@ fi ...@@ -223,4 +224,4 @@ fi
#fi #fi
echo -e "${GREEN}PowerDB is removed successfully!${NC}" echo -e "${GREEN}PowerDB is removed successfully!${NC}"
echo echo
\ No newline at end of file
...@@ -56,7 +56,6 @@ typedef struct SLocalReducer { ...@@ -56,7 +56,6 @@ typedef struct SLocalReducer {
tFilePage * pTempBuffer; tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx; struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result. int32_t rowSize; // size of each intermediate result.
int32_t finalRowSize; // final result row size
int32_t status; // denote it is in reduce process, in reduce process, it int32_t status; // denote it is in reduce process, in reduce process, it
bool hasPrevRow; // cannot be released bool hasPrevRow; // cannot be released
bool hasUnprocessedRow; bool hasUnprocessedRow;
......
...@@ -64,13 +64,13 @@ ...@@ -64,13 +64,13 @@
} \ } \
} while (0); } while (0);
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ #define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do {\ do { \
for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \ SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \
aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \
} \ } \
} while(0); } while (0);
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {} void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
...@@ -3624,52 +3624,147 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) { ...@@ -3624,52 +3624,147 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) {
return false; return false;
} }
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes; SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->lastKey = INT64_MIN; pInfo->lastKey = INT64_MIN;
pInfo->type = pCtx->inputType; pInfo->win = TSWINDOW_INITIALIZER;
return true; return true;
} }
static FORCE_INLINE void setTWALastVal(SQLFunctionCtx *pCtx, const char *data, int32_t i, STwaInfo *pInfo) { static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t size) {
switch (pCtx->inputType) { int32_t notNullElems = 0;
case TSDB_DATA_TYPE_INT: TSKEY *primaryKey = pCtx->ptsList;
pInfo->iLastValue = GET_INT32_VAL(data + pCtx->inputBytes * i);
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t i = index;
if (pCtx->start.key != INT64_MIN) {
assert(pCtx->start.key < primaryKey[index] && pInfo->lastKey == INT64_MIN);
pInfo->lastKey = primaryKey[index];
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0));
pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key);
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pCtx->start.key;
notNullElems++;
i += 1;
} else if (pInfo->lastKey == INT64_MIN) {
pInfo->lastKey = primaryKey[index];
GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0));
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pInfo->lastKey;
notNullElems++;
i += 1;
}
// calculate the value of
switch(pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i];
}
break; break;
case TSDB_DATA_TYPE_TINYINT: }
pInfo->iLastValue = GET_INT8_VAL(data + pCtx->inputBytes * i); case TSDB_DATA_TYPE_SMALLINT: {
int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i];
}
break; break;
case TSDB_DATA_TYPE_SMALLINT: }
pInfo->iLastValue = GET_INT16_VAL(data + pCtx->inputBytes * i); case TSDB_DATA_TYPE_INT: {
int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i];
}
break; break;
case TSDB_DATA_TYPE_BIGINT: }
pInfo->iLastValue = GET_INT64_VAL(data + pCtx->inputBytes * i); case TSDB_DATA_TYPE_BIGINT: {
int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
pInfo->lastValue = (double) val[i];
pInfo->lastKey = primaryKey[i];
}
break; break;
case TSDB_DATA_TYPE_FLOAT: }
pInfo->dLastValue = GET_FLOAT_VAL(data + pCtx->inputBytes * i); case TSDB_DATA_TYPE_FLOAT: {
float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i];
}
break; break;
case TSDB_DATA_TYPE_DOUBLE: }
pInfo->dLastValue = GET_DOUBLE_VAL(data + pCtx->inputBytes * i); case TSDB_DATA_TYPE_DOUBLE: {
double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index);
for (; i < size; i++) {
if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) {
continue;
}
pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey);
pInfo->lastValue = val[i];
pInfo->lastKey = primaryKey[i];
}
break; break;
default: }
assert(0); default: assert(0);
}
// the last interpolated time window value
if (pCtx->end.key != INT64_MIN) {
pInfo->dOutput += ((pInfo->lastValue + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->lastKey);
pInfo->lastValue = pCtx->end.val;
pInfo->lastKey = pCtx->end.key;
} }
pInfo->win.ekey = pInfo->lastKey;
return notNullElems;
} }
static void twa_function(SQLFunctionCtx *pCtx) { static void twa_function(SQLFunctionCtx *pCtx) {
void * data = GET_INPUT_CHAR(pCtx); void * data = GET_INPUT_CHAR(pCtx);
TSKEY *primaryKey = pCtx->ptsList;
int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t i = 0;
// skip null value // skip null value
int32_t i = 0;
while (pCtx->hasNull && i < pCtx->size && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) { while (pCtx->hasNull && i < pCtx->size && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) {
i++; i++;
} }
...@@ -3678,40 +3773,7 @@ static void twa_function(SQLFunctionCtx *pCtx) { ...@@ -3678,40 +3773,7 @@ static void twa_function(SQLFunctionCtx *pCtx) {
return; return;
} }
if (pInfo->lastKey == INT64_MIN) { int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, pCtx->size);
pInfo->lastKey = pCtx->nStartQueryTimestamp;
setTWALastVal(pCtx, data, i, pInfo);
pInfo->hasResult = DATA_SET_FLAG;
}
notNullElems++;
if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT || pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
pInfo->dOutput += pInfo->dLastValue * (primaryKey[i] - pInfo->lastKey);
} else {
pInfo->iOutput += pInfo->iLastValue * (primaryKey[i] - pInfo->lastKey);
}
pInfo->lastKey = primaryKey[i];
setTWALastVal(pCtx, data, i, pInfo);
for (++i; i < pCtx->size; i++) {
if (pCtx->hasNull && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) {
continue;
}
notNullElems++;
if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT || pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
pInfo->dOutput += pInfo->dLastValue * (primaryKey[i] - pInfo->lastKey);
} else {
pInfo->iOutput += pInfo->iLastValue * (primaryKey[i] - pInfo->lastKey);
}
pInfo->lastKey = primaryKey[i];
setTWALastVal(pCtx, data, i, pInfo);
}
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) { if (notNullElems > 0) {
...@@ -3721,8 +3783,6 @@ static void twa_function(SQLFunctionCtx *pCtx) { ...@@ -3721,8 +3783,6 @@ static void twa_function(SQLFunctionCtx *pCtx) {
if (pCtx->stableQuery) { if (pCtx->stableQuery) {
memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo)); memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo));
} }
// pCtx->numOfIteratedElems += notNullElems;
} }
static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
...@@ -3730,35 +3790,12 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -3730,35 +3790,12 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) {
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
return; return;
} }
SET_VAL(pCtx, 1, 1); int32_t notNullElems = twa_function_impl(pCtx, index, 1);
SET_VAL(pCtx, notNullElems, 1);
TSKEY *primaryKey = pCtx->ptsList;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->lastKey == INT64_MIN) {
pInfo->lastKey = pCtx->nStartQueryTimestamp;
setTWALastVal(pCtx, pData, 0, pInfo);
pInfo->hasResult = DATA_SET_FLAG;
}
if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT || pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
pInfo->dOutput += pInfo->dLastValue * (primaryKey[index] - pInfo->lastKey);
} else {
pInfo->iOutput += pInfo->iLastValue * (primaryKey[index] - pInfo->lastKey);
}
// record the last key/value
pInfo->lastKey = primaryKey[index];
setTWALastVal(pCtx, pData, 0, pInfo);
// pCtx->numOfIteratedElems += 1;
pResInfo->hasResult = DATA_SET_FLAG;
if (pCtx->stableQuery) { if (pCtx->stableQuery) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo)); memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo));
} }
} }
...@@ -3778,16 +3815,10 @@ static void twa_func_merge(SQLFunctionCtx *pCtx) { ...@@ -3778,16 +3815,10 @@ static void twa_func_merge(SQLFunctionCtx *pCtx) {
} }
numOfNotNull++; numOfNotNull++;
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) { pBuf->dOutput += pInput->dOutput;
pBuf->iOutput += pInput->iOutput;
} else { pBuf->win = pInput->win;
pBuf->dOutput += pInput->dOutput;
}
pBuf->SKey = pInput->SKey;
pBuf->EKey = pInput->EKey;
pBuf->lastKey = pInput->lastKey; pBuf->lastKey = pInput->lastKey;
pBuf->iLastValue = pInput->iLastValue;
} }
SET_VAL(pCtx, numOfNotNull, 1); SET_VAL(pCtx, numOfNotNull, 1);
...@@ -3814,21 +3845,17 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -3814,21 +3845,17 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo); STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo);
assert(pInfo->EKey >= pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult); assert(pInfo->win.ekey == pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult);
if (pInfo->hasResult != DATA_SET_FLAG) { if (pInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
return; return;
} }
if (pInfo->SKey == pInfo->EKey) { if (pInfo->win.ekey == pInfo->win.skey) {
*(double *)pCtx->aOutputBuf = 0; *(double *)pCtx->aOutputBuf = pInfo->lastValue;
} else if (pInfo->type >= TSDB_DATA_TYPE_TINYINT && pInfo->type <= TSDB_DATA_TYPE_BIGINT) {
pInfo->iOutput += pInfo->iLastValue * (pInfo->EKey - pInfo->lastKey);
*(double *)pCtx->aOutputBuf = pInfo->iOutput / (double)(pInfo->EKey - pInfo->SKey);
} else { } else {
pInfo->dOutput += pInfo->dLastValue * (pInfo->EKey - pInfo->lastKey); *(double *)pCtx->aOutputBuf = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey);
*(double *)pCtx->aOutputBuf = pInfo->dOutput / (pInfo->EKey - pInfo->SKey);
} }
GET_RES_INFO(pCtx)->numOfRes = 1; GET_RES_INFO(pCtx)->numOfRes = 1;
......
...@@ -198,6 +198,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -198,6 +198,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (numOfFlush == 0 || numOfBuffer == 0) { if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pCmd->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty
tscDebug("%p retrieved no data", pSql); tscDebug("%p retrieved no data", pSql);
return; return;
} }
...@@ -330,22 +331,19 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -330,22 +331,19 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16; pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage)); pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
pReducer->finalRowSize = tscGetResRowLength(pQueryInfo->exprList);
pReducer->resColModel = finalmodel; pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize; pReducer->resColModel->capacity = pReducer->nResultBufSize;
pReducer->finalModel = pFFModel; pReducer->finalModel = pFFModel;
assert(pReducer->finalRowSize > 0); if (finalmodel->rowSize > 0) {
if (pReducer->finalRowSize > 0) { pReducer->resColModel->capacity /= finalmodel->rowSize;
pReducer->resColModel->capacity /= pReducer->finalRowSize;
} }
assert(pReducer->finalRowSize <= pReducer->rowSize);
assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pReducer->rowSize);
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity); pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL || if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
/*pReducer->pBufForInterpo == NULL || */pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) { pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) {
tfree(pReducer->pTempBuffer); tfree(pReducer->pTempBuffer);
tfree(pReducer->discardData); tfree(pReducer->discardData);
tfree(pReducer->pResultBuf); tfree(pReducer->pResultBuf);
...@@ -920,7 +918,7 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, ...@@ -920,7 +918,7 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
} }
memcpy(pRes->data, pBeforeFillData->data, (size_t)(pRes->numOfRows * pLocalReducer->finalRowSize)); memcpy(pRes->data, pBeforeFillData->data, (size_t)(pRes->numOfRows * pLocalReducer->finalModel->rowSize));
pRes->numOfClauseTotal += pRes->numOfRows; pRes->numOfClauseTotal += pRes->numOfRows;
pBeforeFillData->num = 0; pBeforeFillData->num = 0;
...@@ -1256,7 +1254,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur ...@@ -1256,7 +1254,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
tColModelCompact(pModel, pResBuf, pModel->capacity); tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) { if (tscIsSecondStageQuery(pQueryInfo)) {
pLocalReducer->finalRowSize = doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize); doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalModel->rowSize);
} }
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
...@@ -1627,7 +1625,8 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) ...@@ -1627,7 +1625,8 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
} }
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) { int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
char* pbuf = calloc(1, pOutput->num * rowSize); int32_t maxRowSize = MAX(rowSize, finalRowSize);
char* pbuf = calloc(1, pOutput->num * maxRowSize);
size_t size = tscNumOfFields(pQueryInfo); size_t size = tscNumOfFields(pQueryInfo);
SArithmeticSupport arithSup = {0}; SArithmeticSupport arithSup = {0};
...@@ -1660,7 +1659,6 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_ ...@@ -1660,7 +1659,6 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
offset += pSup->field.bytes; offset += pSup->field.bytes;
} }
assert(finalRowSize <= rowSize);
memcpy(pOutput->data, pbuf, pOutput->num * offset); memcpy(pOutput->data, pbuf, pOutput->num * offset);
tfree(pbuf); tfree(pbuf);
......
...@@ -4248,7 +4248,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE ...@@ -4248,7 +4248,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
tExprTreeDestroy(&p, NULL); tExprTreeDestroy(&p, NULL);
taosArrayDestroy(colList); taosArrayDestroy(colList);
if (taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0 && !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0 && !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "filter on tag not supported for normal table"); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "filter on tag not supported for normal table");
} }
} }
...@@ -4256,6 +4256,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE ...@@ -4256,6 +4256,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
pCondExpr->pTagCond = NULL; pCondExpr->pTagCond = NULL;
return ret; return ret;
} }
int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql) { int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql) {
if (pExpr == NULL) { if (pExpr == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -6648,7 +6649,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS ...@@ -6648,7 +6649,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
return TSDB_CODE_TSC_INVALID_SQL; return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "not support filter expression");
} }
} else { } else {
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tutil.h" #include "tutil.h"
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h" #include "tglobal.h"
#include "tfile.h"
#include "twal.h" #include "twal.h"
#include "trpc.h" #include "trpc.h"
#include "dnode.h" #include "dnode.h"
...@@ -56,6 +57,7 @@ typedef struct { ...@@ -56,6 +57,7 @@ typedef struct {
} SDnodeComponent; } SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = { static const SDnodeComponent tsDnodeComponents[] = {
{"tfile", tfInit, tfCleanup},
{"rpc", rpcInit, rpcCleanup}, {"rpc", rpcInit, rpcCleanup},
{"storage", dnodeInitStorage, dnodeCleanupStorage}, {"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg}, {"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
......
...@@ -444,12 +444,12 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) { ...@@ -444,12 +444,12 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont; SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId()) { if (pCfg->dnodeId != dnodeGetDnodeId()) {
dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); dDebug("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
} }
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) { if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
dError("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp); dDebug("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED; return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
} }
......
...@@ -51,9 +51,8 @@ typedef struct { ...@@ -51,9 +51,8 @@ typedef struct {
typedef void * twalh; // WAL HANDLE typedef void * twalh; // WAL HANDLE
typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
int32_t walInit(); int32_t walInit();
void walCleanUp(); void walCleanUp();
twalh walOpen(char *path, SWalCfg *pCfg); twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh pWal, SWalCfg *pCfg); int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh); void walStop(twalh);
......
...@@ -79,10 +79,13 @@ typedef struct { ...@@ -79,10 +79,13 @@ typedef struct {
int32_t (*fpRestored)(); int32_t (*fpRestored)();
} SSdbTableDesc; } SSdbTableDesc;
int32_t sdbInitRef();
void sdbCleanUpRef();
int32_t sdbInit(); int32_t sdbInit();
void sdbCleanUp(); void sdbCleanUp();
void * sdbOpenTable(SSdbTableDesc *desc); int64_t sdbOpenTable(SSdbTableDesc *desc);
void sdbCloseTable(void *handle); void sdbCloseTable(int64_t rid);
void* sdbGetTableByRid(int64_t rid);
bool sdbIsMaster(); bool sdbIsMaster();
bool sdbIsServing(); bool sdbIsServing();
void sdbUpdateMnodeRoles(); void sdbUpdateMnodeRoles();
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
int64_t tsAcctRid = -1;
void * tsAcctSdb = NULL; void * tsAcctSdb = NULL;
static int32_t tsAcctUpdateSize; static int32_t tsAcctUpdateSize;
static int32_t mnodeCreateRootAcct(); static int32_t mnodeCreateRootAcct();
...@@ -114,7 +115,8 @@ int32_t mnodeInitAccts() { ...@@ -114,7 +115,8 @@ int32_t mnodeInitAccts() {
.fpRestored = mnodeAcctActionRestored .fpRestored = mnodeAcctActionRestored
}; };
tsAcctSdb = sdbOpenTable(&desc); tsAcctRid = sdbOpenTable(&desc);
tsAcctSdb = sdbGetTableByRid(tsAcctRid);
if (tsAcctSdb == NULL) { if (tsAcctSdb == NULL) {
mError("table:%s, failed to create hash", desc.name); mError("table:%s, failed to create hash", desc.name);
return -1; return -1;
...@@ -126,7 +128,7 @@ int32_t mnodeInitAccts() { ...@@ -126,7 +128,7 @@ int32_t mnodeInitAccts() {
void mnodeCleanupAccts() { void mnodeCleanupAccts() {
acctCleanUp(); acctCleanUp();
sdbCloseTable(tsAcctSdb); sdbCloseTable(tsAcctRid);
tsAcctSdb = NULL; tsAcctSdb = NULL;
} }
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "mnodeShow.h" #include "mnodeShow.h"
#include "tglobal.h" #include "tglobal.h"
int64_t tsClusterRid = -1;
static void * tsClusterSdb = NULL; static void * tsClusterSdb = NULL;
static int32_t tsClusterUpdateSize; static int32_t tsClusterUpdateSize;
static char tsClusterId[TSDB_CLUSTER_ID_LEN]; static char tsClusterId[TSDB_CLUSTER_ID_LEN];
...@@ -101,9 +102,10 @@ int32_t mnodeInitCluster() { ...@@ -101,9 +102,10 @@ int32_t mnodeInitCluster() {
.fpRestored = mnodeClusterActionRestored .fpRestored = mnodeClusterActionRestored
}; };
tsClusterSdb = sdbOpenTable(&desc); tsClusterRid = sdbOpenTable(&desc);
tsClusterSdb = sdbGetTableByRid(tsClusterRid);
if (tsClusterSdb == NULL) { if (tsClusterSdb == NULL) {
mError("table:%s, failed to create hash", desc.name); mError("table:%s, rid:%" PRId64 ", failed to create hash", desc.name, tsClusterRid);
return -1; return -1;
} }
...@@ -116,7 +118,7 @@ int32_t mnodeInitCluster() { ...@@ -116,7 +118,7 @@ int32_t mnodeInitCluster() {
} }
void mnodeCleanupCluster() { void mnodeCleanupCluster() {
sdbCloseTable(tsClusterSdb); sdbCloseTable(tsClusterRid);
tsClusterSdb = NULL; tsClusterSdb = NULL;
} }
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
#define VG_LIST_SIZE 8 #define VG_LIST_SIZE 8
int64_t tsDbRid = -1;
static void * tsDbSdb = NULL; static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize; static int32_t tsDbUpdateSize;
...@@ -160,7 +161,8 @@ int32_t mnodeInitDbs() { ...@@ -160,7 +161,8 @@ int32_t mnodeInitDbs() {
.fpRestored = mnodeDbActionRestored .fpRestored = mnodeDbActionRestored
}; };
tsDbSdb = sdbOpenTable(&desc); tsDbRid = sdbOpenTable(&desc);
tsDbSdb = sdbGetTableByRid(tsDbRid);
if (tsDbSdb == NULL) { if (tsDbSdb == NULL) {
mError("failed to init db data"); mError("failed to init db data");
return -1; return -1;
...@@ -496,7 +498,7 @@ void mnodeRemoveVgroupFromDb(SVgObj *pVgroup) { ...@@ -496,7 +498,7 @@ void mnodeRemoveVgroupFromDb(SVgObj *pVgroup) {
} }
void mnodeCleanupDbs() { void mnodeCleanupDbs() {
sdbCloseTable(tsDbSdb); sdbCloseTable(tsDbRid);
tsDbSdb = NULL; tsDbSdb = NULL;
} }
......
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#include "mnodeCluster.h" #include "mnodeCluster.h"
int32_t tsAccessSquence = 0; int32_t tsAccessSquence = 0;
int64_t tsDnodeRid = -1;
static void * tsDnodeSdb = NULL; static void * tsDnodeSdb = NULL;
static int32_t tsDnodeUpdateSize = 0; static int32_t tsDnodeUpdateSize = 0;
extern void * tsMnodeSdb; extern void * tsMnodeSdb;
...@@ -187,7 +188,8 @@ int32_t mnodeInitDnodes() { ...@@ -187,7 +188,8 @@ int32_t mnodeInitDnodes() {
.fpRestored = mnodeDnodeActionRestored .fpRestored = mnodeDnodeActionRestored
}; };
tsDnodeSdb = sdbOpenTable(&desc); tsDnodeRid = sdbOpenTable(&desc);
tsDnodeSdb = sdbGetTableByRid(tsDnodeRid);
if (tsDnodeSdb == NULL) { if (tsDnodeSdb == NULL) {
mError("failed to init dnodes data"); mError("failed to init dnodes data");
return -1; return -1;
...@@ -213,7 +215,7 @@ int32_t mnodeInitDnodes() { ...@@ -213,7 +215,7 @@ int32_t mnodeInitDnodes() {
} }
void mnodeCleanupDnodes() { void mnodeCleanupDnodes() {
sdbCloseTable(tsDnodeSdb); sdbCloseTable(tsDnodeRid);
pthread_mutex_destroy(&tsDnodeEpsMutex); pthread_mutex_destroy(&tsDnodeEpsMutex);
free(tsDnodeEps); free(tsDnodeEps);
tsDnodeEps = NULL; tsDnodeEps = NULL;
......
...@@ -47,6 +47,7 @@ void *tsMnodeTmr = NULL; ...@@ -47,6 +47,7 @@ void *tsMnodeTmr = NULL;
static bool tsMgmtIsRunning = false; static bool tsMgmtIsRunning = false;
static const SMnodeComponent tsMnodeComponents[] = { static const SMnodeComponent tsMnodeComponents[] = {
{"sdbref", sdbInitRef, sdbCleanUpRef},
{"profile", mnodeInitProfile, mnodeCleanupProfile}, {"profile", mnodeInitProfile, mnodeCleanupProfile},
{"cluster", mnodeInitCluster, mnodeCleanupCluster}, {"cluster", mnodeInitCluster, mnodeCleanupCluster},
{"accts", mnodeInitAccts, mnodeCleanupAccts}, {"accts", mnodeInitAccts, mnodeCleanupAccts},
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
int64_t tsMnodeRid = -1;
static void * tsMnodeSdb = NULL; static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0; static int32_t tsMnodeUpdateSize = 0;
static SRpcEpSet tsMnodeEpSetForShell; static SRpcEpSet tsMnodeEpSetForShell;
...@@ -153,7 +154,8 @@ int32_t mnodeInitMnodes() { ...@@ -153,7 +154,8 @@ int32_t mnodeInitMnodes() {
.fpRestored = mnodeMnodeActionRestored .fpRestored = mnodeMnodeActionRestored
}; };
tsMnodeSdb = sdbOpenTable(&desc); tsMnodeRid = sdbOpenTable(&desc);
tsMnodeSdb = sdbGetTableByRid(tsMnodeRid);
if (tsMnodeSdb == NULL) { if (tsMnodeSdb == NULL) {
mError("failed to init mnodes data"); mError("failed to init mnodes data");
return -1; return -1;
...@@ -168,7 +170,7 @@ int32_t mnodeInitMnodes() { ...@@ -168,7 +170,7 @@ int32_t mnodeInitMnodes() {
} }
void mnodeCleanupMnodes() { void mnodeCleanupMnodes() {
sdbCloseTable(tsMnodeSdb); sdbCloseTable(tsMnodeRid);
tsMnodeSdb = NULL; tsMnodeSdb = NULL;
mnodeMnodeDestroyLock(); mnodeMnodeDestroyLock();
} }
...@@ -251,12 +253,30 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { ...@@ -251,12 +253,30 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*epSet = tsMnodeEpSetForPeer; *epSet = tsMnodeEpSetForPeer;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mTrace("mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mTrace("mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
} }
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*epSet = tsMnodeEpSetForShell; *epSet = tsMnodeEpSetForShell;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mTrace("mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mTrace("mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
} }
char* mnodeGetMnodeMasterEp() { char* mnodeGetMnodeMasterEp() {
......
...@@ -57,16 +57,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { ...@@ -57,16 +57,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg, mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mpeer:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mpeer:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#define QUERY_ID_SIZE 20 #define QUERY_ID_SIZE 20
#define QUERY_STREAM_SAVE_SIZE 20 #define QUERY_STREAM_SAVE_SIZE 20
extern void *tsMnodeTmr;
static SCacheObj *tsMnodeConnCache = NULL; static SCacheObj *tsMnodeConnCache = NULL;
static int32_t tsConnIndex = 0; static int32_t tsConnIndex = 0;
......
...@@ -51,21 +51,12 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { ...@@ -51,21 +51,12 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForShell(epSet); mnodeGetMnodeEpSetForShell(epSet);
mDebug("msg:%p, app:%p type:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "hash.h" #include "hash.h"
#include "tutil.h" #include "tutil.h"
#include "tref.h"
#include "tbalance.h" #include "tbalance.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "twal.h"
...@@ -98,6 +99,7 @@ typedef struct { ...@@ -98,6 +99,7 @@ typedef struct {
SSdbWorker *worker; SSdbWorker *worker;
} SSdbWorkerPool; } SSdbWorkerPool;
int32_t tsSdbRid;
extern void * tsMnodeTmr; extern void * tsMnodeTmr;
static void * tsSdbTmr; static void * tsSdbTmr;
static SSdbMgmt tsSdbMgmt = {0}; static SSdbMgmt tsSdbMgmt = {0};
...@@ -118,6 +120,7 @@ static void sdbFreeQueue(); ...@@ -118,6 +120,7 @@ static void sdbFreeQueue();
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow); static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow);
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow); static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow);
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow); static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow);
static void sdbCloseTableObj(void *handle);
int32_t sdbGetId(void *pTable) { int32_t sdbGetId(void *pTable) {
return ((SSdbTable *)pTable)->autoIndex; return ((SSdbTable *)pTable)->autoIndex;
...@@ -385,6 +388,17 @@ void sdbUpdateSync(void *pMnodes) { ...@@ -385,6 +388,17 @@ void sdbUpdateSync(void *pMnodes) {
sdbUpdateMnodeRoles(); sdbUpdateMnodeRoles();
} }
int32_t sdbInitRef() {
tsSdbRid = taosOpenRef(10, sdbCloseTableObj);
if (tsSdbRid <= 0) {
sdbError("failed to init sdb ref");
return -1;
}
return 0;
}
void sdbCleanUpRef() { taosCloseRef(tsSdbRid); }
int32_t sdbInit() { int32_t sdbInit() {
pthread_mutex_init(&tsSdbMgmt.mutex, NULL); pthread_mutex_init(&tsSdbMgmt.mutex, NULL);
...@@ -423,7 +437,7 @@ void sdbCleanUp() { ...@@ -423,7 +437,7 @@ void sdbCleanUp() {
walClose(tsSdbMgmt.wal); walClose(tsSdbMgmt.wal);
tsSdbMgmt.wal = NULL; tsSdbMgmt.wal = NULL;
} }
pthread_mutex_destroy(&tsSdbMgmt.mutex); pthread_mutex_destroy(&tsSdbMgmt.mutex);
} }
...@@ -506,7 +520,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) { ...@@ -506,7 +520,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) {
atomic_add_fetch_32(&pTable->autoIndex, 1); atomic_add_fetch_32(&pTable->autoIndex, 1);
} }
sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name, sdbTrace("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name,
sdbGetRowStr(pTable, pRow->pObj), pRow->rowSize, pTable->numOfRows, pRow->pMsg); sdbGetRowStr(pTable, pRow->pObj), pRow->rowSize, pTable->numOfRows, pRow->pMsg);
int32_t code = (*pTable->fpInsert)(pRow); int32_t code = (*pTable->fpInsert)(pRow);
...@@ -542,7 +556,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { ...@@ -542,7 +556,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) {
atomic_sub_fetch_32(&pTable->numOfRows, 1); atomic_sub_fetch_32(&pTable->numOfRows, 1);
sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, sdbTrace("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name,
sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg);
sdbDecRef(pTable, pRow->pObj); sdbDecRef(pTable, pRow->pObj);
...@@ -551,7 +565,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { ...@@ -551,7 +565,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) {
} }
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) { static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) {
sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, sdbTrace("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name,
sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg);
(*pTable->fpUpdate)(pRow); (*pTable->fpUpdate)(pRow);
...@@ -649,7 +663,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * ...@@ -649,7 +663,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
return syncCode; return syncCode;
} }
sdbDebug("vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%" PRIu64, pTable->name, sdbTrace("vgId:1, sdb:%s, record from %s is disposed, action:%s key:%s hver:%" PRIu64, pTable->name, qtypeStr[qtype],
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
// even it is WAL/FWD, it shall be called to update version in sync // even it is WAL/FWD, it shall be called to update version in sync
...@@ -801,10 +815,10 @@ void sdbFreeIter(void *tparam, void *pIter) { ...@@ -801,10 +815,10 @@ void sdbFreeIter(void *tparam, void *pIter) {
taosHashCancelIterate(pTable->iHandle, pIter); taosHashCancelIterate(pTable->iHandle, pIter);
} }
void *sdbOpenTable(SSdbTableDesc *pDesc) { int64_t sdbOpenTable(SSdbTableDesc *pDesc) {
SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable)); SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable));
if (pTable == NULL) return NULL; if (pTable == NULL) return -1;
pthread_mutex_init(&pTable->mutex, NULL); pthread_mutex_init(&pTable->mutex, NULL);
tstrncpy(pTable->name, pDesc->name, SDB_TABLE_LEN); tstrncpy(pTable->name, pDesc->name, SDB_TABLE_LEN);
...@@ -829,10 +843,21 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { ...@@ -829,10 +843,21 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
tsSdbMgmt.numOfTables++; tsSdbMgmt.numOfTables++;
tsSdbMgmt.tableList[pTable->id] = pTable; tsSdbMgmt.tableList[pTable->id] = pTable;
return pTable;
return taosAddRef(tsSdbRid, pTable);
}
void sdbCloseTable(int64_t rid) {
taosRemoveRef(tsSdbRid, rid);
}
void *sdbGetTableByRid(int64_t rid) {
void *handle = taosAcquireRef(tsSdbRid, rid);
taosReleaseRef(tsSdbRid, rid);
return handle;
} }
void sdbCloseTable(void *handle) { static void sdbCloseTableObj(void *handle) {
SSdbTable *pTable = (SSdbTable *)handle; SSdbTable *pTable = (SSdbTable *)handle;
if (pTable == NULL) return; if (pTable == NULL) return;
...@@ -855,6 +880,7 @@ void sdbCloseTable(void *handle) { ...@@ -855,6 +880,7 @@ void sdbCloseTable(void *handle) {
taosHashCancelIterate(pTable->iHandle, pIter); taosHashCancelIterate(pTable->iHandle, pIter);
taosHashCleanup(pTable->iHandle); taosHashCleanup(pTable->iHandle);
pTable->iHandle = NULL;
pthread_mutex_destroy(&pTable->mutex); pthread_mutex_destroy(&pTable->mutex);
sdbDebug("vgId:1, sdb:%s, is closed, numOfTables:%d", pTable->name, tsSdbMgmt.numOfTables); sdbDebug("vgId:1, sdb:%s, is closed, numOfTables:%d", pTable->name, tsSdbMgmt.numOfTables);
......
...@@ -52,7 +52,6 @@ static bool mnodeCheckShowFinished(SShowObj *pShow); ...@@ -52,7 +52,6 @@ static bool mnodeCheckShowFinished(SShowObj *pShow);
static void *mnodePutShowObj(SShowObj *pShow); static void *mnodePutShowObj(SShowObj *pShow);
static void mnodeReleaseShowObj(SShowObj *pShow, bool forceRemove); static void mnodeReleaseShowObj(SShowObj *pShow, bool forceRemove);
extern void *tsMnodeTmr;
static void *tsMnodeShowCache = NULL; static void *tsMnodeShowCache = NULL;
static int32_t tsShowObjIndex = 0; static int32_t tsShowObjIndex = 0;
static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
......
...@@ -49,7 +49,9 @@ ...@@ -49,7 +49,9 @@
#define CREATE_CTABLE_RETRY_TIMES 10 #define CREATE_CTABLE_RETRY_TIMES 10
#define CREATE_CTABLE_RETRY_SEC 14 #define CREATE_CTABLE_RETRY_SEC 14
int64_t tsCTableRid = -1;
static void * tsChildTableSdb; static void * tsChildTableSdb;
int64_t tsSTableRid = -1;
static void * tsSuperTableSdb; static void * tsSuperTableSdb;
static int32_t tsChildTableUpdateSize; static int32_t tsChildTableUpdateSize;
static int32_t tsSuperTableUpdateSize; static int32_t tsSuperTableUpdateSize;
...@@ -350,7 +352,7 @@ static int32_t mnodeInitChildTables() { ...@@ -350,7 +352,7 @@ static int32_t mnodeInitChildTables() {
SCTableObj tObj; SCTableObj tObj;
tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type;
SSdbTableDesc tableDesc = { SSdbTableDesc desc = {
.id = SDB_TABLE_CTABLE, .id = SDB_TABLE_CTABLE,
.name = "ctables", .name = "ctables",
.hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE,
...@@ -366,7 +368,8 @@ static int32_t mnodeInitChildTables() { ...@@ -366,7 +368,8 @@ static int32_t mnodeInitChildTables() {
.fpRestored = mnodeChildTableActionRestored .fpRestored = mnodeChildTableActionRestored
}; };
tsChildTableSdb = sdbOpenTable(&tableDesc); tsCTableRid = sdbOpenTable(&desc);
tsChildTableSdb = sdbGetTableByRid(tsCTableRid);
if (tsChildTableSdb == NULL) { if (tsChildTableSdb == NULL) {
mError("failed to init child table data"); mError("failed to init child table data");
return -1; return -1;
...@@ -377,7 +380,7 @@ static int32_t mnodeInitChildTables() { ...@@ -377,7 +380,7 @@ static int32_t mnodeInitChildTables() {
} }
static void mnodeCleanupChildTables() { static void mnodeCleanupChildTables() {
sdbCloseTable(tsChildTableSdb); sdbCloseTable(tsCTableRid);
tsChildTableSdb = NULL; tsChildTableSdb = NULL;
} }
...@@ -543,7 +546,7 @@ static int32_t mnodeInitSuperTables() { ...@@ -543,7 +546,7 @@ static int32_t mnodeInitSuperTables() {
SSTableObj tObj; SSTableObj tObj;
tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type;
SSdbTableDesc tableDesc = { SSdbTableDesc desc = {
.id = SDB_TABLE_STABLE, .id = SDB_TABLE_STABLE,
.name = "stables", .name = "stables",
.hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE,
...@@ -559,7 +562,8 @@ static int32_t mnodeInitSuperTables() { ...@@ -559,7 +562,8 @@ static int32_t mnodeInitSuperTables() {
.fpRestored = mnodeSuperTableActionRestored .fpRestored = mnodeSuperTableActionRestored
}; };
tsSuperTableSdb = sdbOpenTable(&tableDesc); tsSTableRid = sdbOpenTable(&desc);
tsSuperTableSdb = sdbGetTableByRid(tsSTableRid);
if (tsSuperTableSdb == NULL) { if (tsSuperTableSdb == NULL) {
mError("failed to init stables data"); mError("failed to init stables data");
return -1; return -1;
...@@ -570,7 +574,7 @@ static int32_t mnodeInitSuperTables() { ...@@ -570,7 +574,7 @@ static int32_t mnodeInitSuperTables() {
} }
static void mnodeCleanupSuperTables() { static void mnodeCleanupSuperTables() {
sdbCloseTable(tsSuperTableSdb); sdbCloseTable(tsSTableRid);
tsSuperTableSdb = NULL; tsSuperTableSdb = NULL;
} }
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include "mnodeWrite.h" #include "mnodeWrite.h"
#include "mnodePeer.h" #include "mnodePeer.h"
int64_t tsUserRid = -1;
static void * tsUserSdb = NULL; static void * tsUserSdb = NULL;
static int32_t tsUserUpdateSize = 0; static int32_t tsUserUpdateSize = 0;
static int32_t mnodeGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
...@@ -165,7 +166,8 @@ int32_t mnodeInitUsers() { ...@@ -165,7 +166,8 @@ int32_t mnodeInitUsers() {
.fpRestored = mnodeUserActionRestored .fpRestored = mnodeUserActionRestored
}; };
tsUserSdb = sdbOpenTable(&desc); tsUserRid = sdbOpenTable(&desc);
tsUserSdb = sdbGetTableByRid(tsUserRid);
if (tsUserSdb == NULL) { if (tsUserSdb == NULL) {
mError("table:%s, failed to create hash", desc.name); mError("table:%s, failed to create hash", desc.name);
return -1; return -1;
...@@ -185,7 +187,7 @@ int32_t mnodeInitUsers() { ...@@ -185,7 +187,7 @@ int32_t mnodeInitUsers() {
} }
void mnodeCleanupUsers() { void mnodeCleanupUsers() {
sdbCloseTable(tsUserSdb); sdbCloseTable(tsUserRid);
tsUserSdb = NULL; tsUserSdb = NULL;
} }
......
...@@ -51,6 +51,7 @@ char* vgroupStatus[] = { ...@@ -51,6 +51,7 @@ char* vgroupStatus[] = {
"updating" "updating"
}; };
int64_t tsVgroupRid = -1;
static void *tsVgroupSdb = NULL; static void *tsVgroupSdb = NULL;
static int32_t tsVgUpdateSize = 0; static int32_t tsVgUpdateSize = 0;
...@@ -222,7 +223,8 @@ int32_t mnodeInitVgroups() { ...@@ -222,7 +223,8 @@ int32_t mnodeInitVgroups() {
.fpRestored = mnodeVgroupActionRestored, .fpRestored = mnodeVgroupActionRestored,
}; };
tsVgroupSdb = sdbOpenTable(&desc); tsVgroupRid = sdbOpenTable(&desc);
tsVgroupSdb = sdbGetTableByRid(tsVgroupRid);
if (tsVgroupSdb == NULL) { if (tsVgroupSdb == NULL) {
mError("failed to init vgroups data"); mError("failed to init vgroups data");
return -1; return -1;
...@@ -610,7 +612,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) { ...@@ -610,7 +612,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
} }
void mnodeCleanupVgroups() { void mnodeCleanupVgroups() {
sdbCloseTable(tsVgroupSdb); sdbCloseTable(tsVgroupRid);
tsVgroupSdb = NULL; tsVgroupSdb = NULL;
} }
......
...@@ -54,18 +54,8 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { ...@@ -54,18 +54,8 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SRpcEpSet);
mDebug("msg:%p, app:%p type:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle, mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d, set inUse to %d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i],
htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i],
htons(epSet->port[i]));
}
}
return TSDB_CODE_RPC_REDIRECT; return TSDB_CODE_RPC_REDIRECT;
} }
......
...@@ -20,17 +20,6 @@ ...@@ -20,17 +20,6 @@
extern "C" { extern "C" {
#endif #endif
#define tread(fd, buf, count) read(fd, buf, count)
#define twrite(fd, buf, count) write(fd, buf, count)
#define tlseek(fd, offset, whence) lseek(fd, offset, whence)
#define tclose(fd) \
{ \
if (FD_VALID(fd)) { \
close(fd); \
fd = FD_INITIALIZER; \
} \
}
int64_t taosReadImp(int32_t fd, void *buf, int64_t count); int64_t taosReadImp(int32_t fd, void *buf, int64_t count);
int64_t taosWriteImp(int32_t fd, void *buf, int64_t count); int64_t taosWriteImp(int32_t fd, void *buf, int64_t count);
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence); int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence);
...@@ -40,7 +29,13 @@ int64_t taosCopy(char *from, char *to); ...@@ -40,7 +29,13 @@ int64_t taosCopy(char *from, char *to);
#define taosRead(fd, buf, count) taosReadImp(fd, buf, count) #define taosRead(fd, buf, count) taosReadImp(fd, buf, count)
#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count) #define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count)
#define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence) #define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence)
#define taosClose(x) tclose(x) #define taosClose(fd) \
{ \
if (FD_VALID(fd)) { \
close(fd); \
fd = FD_INITIALIZER; \
} \
}
// TAOS_OS_FUNC_FILE_SENDIFLE // TAOS_OS_FUNC_FILE_SENDIFLE
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size); int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size);
......
...@@ -116,7 +116,7 @@ int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) { ...@@ -116,7 +116,7 @@ int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) {
} }
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) { int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) {
return (int64_t)tlseek(fd, (long)offset, whence); return (int64_t)lseek(fd, (long)offset, whence);
} }
int64_t taosCopy(char *from, char *to) { int64_t taosCopy(char *from, char *to) {
......
...@@ -63,9 +63,11 @@ typedef struct SSqlGroupbyExpr { ...@@ -63,9 +63,11 @@ typedef struct SSqlGroupbyExpr {
typedef struct SResultRow { typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t rowId:15; int32_t rowId:29; // row index in buffer page
bool closed:1; // this result status: closed or opened bool startInterp; // the time window start timestamp has done the interpolation already.
uint16_t numOfRows; // number of rows of current time window bool endInterp; // the time window end timestamp has done the interpolation already.
bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current time window union {STimeWindow win; char* key;}; // start key of current time window
} SResultRow; } SResultRow;
...@@ -187,6 +189,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -187,6 +189,7 @@ typedef struct SQueryRuntimeEnv {
bool topBotQuery; // false bool topBotQuery; // false
bool groupbyNormalCol; // denote if this is a groupby normal column query bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
...@@ -195,6 +198,8 @@ typedef struct SQueryRuntimeEnv { ...@@ -195,6 +198,8 @@ typedef struct SQueryRuntimeEnv {
SResultRowPool* pool; // window result object pool SResultRowPool* pool; // window result object pool
int32_t* rowCellInfoOffset;// offset value for each row result cell info int32_t* rowCellInfoOffset;// offset value for each row result cell info
char** prevRow;
char** nextRow;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
enum { enum {
......
...@@ -152,6 +152,11 @@ typedef struct SResultRowCellInfo { ...@@ -152,6 +152,11 @@ typedef struct SResultRowCellInfo {
uint32_t numOfRes; // num of output result in current buffer uint32_t numOfRes; // num of output result in current buffer
} SResultRowCellInfo; } SResultRowCellInfo;
typedef struct SPoint1 {
int64_t key;
double val;
} SPoint1;
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo))) #define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo)))
struct SQLFunctionCtx; struct SQLFunctionCtx;
...@@ -194,6 +199,8 @@ typedef struct SQLFunctionCtx { ...@@ -194,6 +199,8 @@ typedef struct SQLFunctionCtx {
SResultRowCellInfo *resultInfo; SResultRowCellInfo *resultInfo;
SExtTagsInfo tagInfo; SExtTagsInfo tagInfo;
SPoint1 start;
SPoint1 end;
} SQLFunctionCtx; } SQLFunctionCtx;
typedef struct SQLAggFuncElem { typedef struct SQLAggFuncElem {
...@@ -243,21 +250,11 @@ enum { ...@@ -243,21 +250,11 @@ enum {
}; };
typedef struct STwaInfo { typedef struct STwaInfo {
TSKEY lastKey; TSKEY lastKey;
int8_t hasResult; // flag to denote has value int8_t hasResult; // flag to denote has value
int16_t type; // source data type double dOutput;
TSKEY SKey; double lastValue;
TSKEY EKey; STimeWindow win;
union {
double dOutput;
int64_t iOutput;
};
union {
double dLastValue;
int64_t iLastValue;
};
} STwaInfo; } STwaInfo;
/* global sql function array */ /* global sql function array */
...@@ -276,8 +273,6 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha ...@@ -276,8 +273,6 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha
(_r)->initialized = false; \ (_r)->initialized = false; \
} while (0) } while (0)
//void setResultInfoBuf(SResultRowCellInfo *pResInfo, char* buf);
static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t bufLen) { static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t bufLen) {
pResInfo->initialized = true; // the this struct has been initialized flag pResInfo->initialized = true; // the this struct has been initialized flag
......
...@@ -370,6 +370,66 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -370,6 +370,66 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
#endif #endif
} }
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
switch(type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t* p = (int8_t*) dest;
int8_t* pSrc = (int8_t*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t* p = (int16_t*) dest;
int16_t* pSrc = (int16_t*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t* p = (int32_t*) dest;
int32_t* pSrc = (int32_t*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t* p = (int64_t*) dest;
int64_t* pSrc = (int64_t*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float* p = (float*) dest;
float* pSrc = (float*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double* p = (double*) dest;
double* pSrc = (double*) src;
for(int32_t i = 0; i < numOfRows; ++i) {
p[i] = pSrc[numOfRows - i - 1];
}
break;
}
default: assert(0);
}
}
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*getSourceDataBlock)(void *, const char*, int32_t)) { char *(*getSourceDataBlock)(void *, const char*, int32_t)) {
if (pExprs == NULL) { if (pExprs == NULL) {
...@@ -387,6 +447,8 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, ...@@ -387,6 +447,8 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
/* the right output has result from the right child syntax tree */ /* the right output has result from the right child syntax tree */
char *pRightOutput = malloc(sizeof(int64_t) * numOfRows); char *pRightOutput = malloc(sizeof(int64_t) * numOfRows);
char *pdata = malloc(sizeof(int64_t) * numOfRows);
if (pRight->nodeType == TSQL_NODE_EXPR) { if (pRight->nodeType == TSQL_NODE_EXPR) {
tExprTreeCalcTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock); tExprTreeCalcTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock);
} }
...@@ -398,52 +460,75 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, ...@@ -398,52 +460,75 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
* the type of returned value of one expression is always double float precious * the type of returned value of one expression is always double float precious
*/ */
_bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr); _bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
fp(pLeftOutput, pRightOutput, numOfRows, numOfRows, pOutput, order); fp(pLeftOutput, pRightOutput, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC);
} else if (pRight->nodeType == TSQL_NODE_COL) { // exprLeft + columnRight } else if (pRight->nodeType == TSQL_NODE_COL) { // exprLeft + columnRight
_bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pSchema->type, pExprs->_node.optr); _bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pSchema->type, pExprs->_node.optr);
// set input buffer // set input buffer
char *pInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId); char *pInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
fp(pLeftOutput, pInputData, numOfRows, numOfRows, pOutput, order); if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pInputData, pRight->pSchema->type, numOfRows);
fp(pLeftOutput, pdata, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC);
} else {
fp(pLeftOutput, pInputData, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC);
}
} else if (pRight->nodeType == TSQL_NODE_VALUE) { // exprLeft + 12 } else if (pRight->nodeType == TSQL_NODE_VALUE) { // exprLeft + 12
_bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pVal->nType, pExprs->_node.optr); _bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pVal->nType, pExprs->_node.optr);
fp(pLeftOutput, &pRight->pVal->i64Key, numOfRows, 1, pOutput, order); fp(pLeftOutput, &pRight->pVal->i64Key, numOfRows, 1, pOutput, TSDB_ORDER_ASC);
} }
} else if (pLeft->nodeType == TSQL_NODE_COL) { } else if (pLeft->nodeType == TSQL_NODE_COL) {
// column data specified on left-hand-side // column data specified on left-hand-side
char *pLeftInputData = getSourceDataBlock(param, pLeft->pSchema->name, pLeft->pSchema->colId); char *pLeftInputData = getSourceDataBlock(param, pLeft->pSchema->name, pLeft->pSchema->colId);
if (pRight->nodeType == TSQL_NODE_EXPR) { // columnLeft + expr2 if (pRight->nodeType == TSQL_NODE_EXPR) { // columnLeft + expr2
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr); _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
fp(pLeftInputData, pRightOutput, numOfRows, numOfRows, pOutput, order);
if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pLeftInputData, pLeft->pSchema->type, numOfRows);
fp(pdata, pRightOutput, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC);
} else {
fp(pLeftInputData, pRightOutput, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC);
}
} else if (pRight->nodeType == TSQL_NODE_COL) { // columnLeft + columnRight } else if (pRight->nodeType == TSQL_NODE_COL) { // columnLeft + columnRight
// column data specified on right-hand-side // column data specified on right-hand-side
char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId); char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pSchema->type, pExprs->_node.optr); _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pSchema->type, pExprs->_node.optr);
fp(pLeftInputData, pRightInputData, numOfRows, numOfRows, pOutput, order);
// both columns are descending order, do not reverse the source data
fp(pLeftInputData, pRightInputData, numOfRows, numOfRows, pOutput, order);
} else if (pRight->nodeType == TSQL_NODE_VALUE) { // columnLeft + 12 } else if (pRight->nodeType == TSQL_NODE_VALUE) { // columnLeft + 12
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pVal->nType, pExprs->_node.optr); _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pVal->nType, pExprs->_node.optr);
fp(pLeftInputData, &pRight->pVal->i64Key, numOfRows, 1, pOutput, order);
if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pLeftInputData, pLeft->pSchema->type, numOfRows);
fp(pdata, &pRight->pVal->i64Key, numOfRows, 1, pOutput, TSDB_ORDER_ASC);
} else {
fp(pLeftInputData, &pRight->pVal->i64Key, numOfRows, 1, pOutput, TSDB_ORDER_ASC);
}
} }
} else { } else {
// column data specified on left-hand-side // column data specified on left-hand-side
if (pRight->nodeType == TSQL_NODE_EXPR) { // 12 + expr2 if (pRight->nodeType == TSQL_NODE_EXPR) { // 12 + expr2
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr); _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr);
fp(&pLeft->pVal->i64Key, pRightOutput, 1, numOfRows, pOutput, order); fp(&pLeft->pVal->i64Key, pRightOutput, 1, numOfRows, pOutput, TSDB_ORDER_ASC);
} else if (pRight->nodeType == TSQL_NODE_COL) { // 12 + columnRight } else if (pRight->nodeType == TSQL_NODE_COL) { // 12 + columnRight
// column data specified on right-hand-side // column data specified on right-hand-side
char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId); char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pSchema->type, pExprs->_node.optr); _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pSchema->type, pExprs->_node.optr);
fp(&pLeft->pVal->i64Key, pRightInputData, 1, numOfRows, pOutput, order);
if (order == TSDB_ORDER_DESC) {
reverseCopy(pdata, pRightInputData, pRight->pSchema->type, numOfRows);
fp(&pLeft->pVal->i64Key, pdata, numOfRows, 1, pOutput, TSDB_ORDER_ASC);
} else {
fp(&pLeft->pVal->i64Key, pRightInputData, 1, numOfRows, pOutput, TSDB_ORDER_ASC);
}
} else if (pRight->nodeType == TSQL_NODE_VALUE) { // 12 + 12 } else if (pRight->nodeType == TSQL_NODE_VALUE) { // 12 + 12
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pVal->nType, pExprs->_node.optr); _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pVal->nType, pExprs->_node.optr);
fp(&pLeft->pVal->i64Key, &pRight->pVal->i64Key, 1, 1, pOutput, order); fp(&pLeft->pVal->i64Key, &pRight->pVal->i64Key, 1, 1, pOutput, TSDB_ORDER_ASC);
} }
} }
......
此差异已折叠。
...@@ -395,11 +395,10 @@ uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) { ...@@ -395,11 +395,10 @@ uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) {
} }
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
if ((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) || if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) {
pRuntimeEnv->groupbyNormalCol) {
return 0; return 0;
} }
STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current); STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable);
return id->uid; return id->uid;
} }
\ No newline at end of file
...@@ -20,23 +20,26 @@ ...@@ -20,23 +20,26 @@
extern "C" { extern "C" {
#endif #endif
#include <unistd.h>
// init taos file module // init taos file module
int32_t tfinit(); int32_t tfInit();
// clean up taos file module // clean up taos file module
void tfcleanup(); void tfCleanup();
// the same syntax as UNIX standard open/close/read/write // the same syntax as UNIX standard open/close/read/write
// but FD is int64_t and will never be reused // but FD is int64_t and will never be reused
int64_t tfopen(const char *pathname, int32_t flags); int64_t tfOpen(const char *pathname, int32_t flags);
int64_t tfclose(int64_t tfd); int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode);
int64_t tfwrite(int64_t tfd, void *buf, int64_t count); int64_t tfClose(int64_t tfd);
int64_t tfread(int64_t tfd, void *buf, int64_t count); int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
int32_t tfFsync(int64_t tfd);
bool tfValid(int64_t tfd);
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
int32_t tfFtruncate(int64_t tfd, int64_t length);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif // TDENGINE_TREF_H #endif // TDENGINE_TFILE_H
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tulog.h" #include "tulog.h"
...@@ -21,40 +22,52 @@ ...@@ -21,40 +22,52 @@
static int32_t tsFileRsetId = -1; static int32_t tsFileRsetId = -1;
static void taosCloseFile(void *p) { static void tfCloseFile(void *p) {
close((int32_t)(uintptr_t)p); close((int32_t)(uintptr_t)p);
} }
int32_t tfinit() { int32_t tfInit() {
tsFileRsetId = taosOpenRef(2000, taosCloseFile); tsFileRsetId = taosOpenRef(2000, tfCloseFile);
return tsFileRsetId; if (tsFileRsetId > 0) {
return 0;
} else {
return -1;
}
} }
void tfcleanup() { void tfCleanup() {
if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId);
tsFileRsetId = -1; tsFileRsetId = -1;
} }
int64_t tfopen(const char *pathname, int32_t flags) { static int64_t tfOpenImp(int32_t fd) {
int32_t fd = open(pathname, flags);
if (fd < 0) { if (fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
void *p = (void *)(int64_t)fd; void * p = (void *)(int64_t)fd;
int64_t rid = taosAddRef(tsFileRsetId, p); int64_t rid = taosAddRef(tsFileRsetId, p);
if (rid < 0) close(fd); if (rid < 0) close(fd);
return rid; return rid;
} }
int64_t tfclose(int64_t tfd) { int64_t tfOpen(const char *pathname, int32_t flags) {
int32_t fd = open(pathname, flags);
return tfOpenImp(fd);
}
int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode) {
int32_t fd = open(pathname, flags, mode);
return tfOpenImp(fd);
}
int64_t tfClose(int64_t tfd) {
return taosRemoveRef(tsFileRsetId, tfd); return taosRemoveRef(tsFileRsetId, tfd);
} }
int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { int64_t tfWrite(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd); void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1; if (p == NULL) return -1;
...@@ -67,7 +80,7 @@ int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { ...@@ -67,7 +80,7 @@ int64_t tfwrite(int64_t tfd, void *buf, int64_t count) {
return ret; return ret;
} }
int64_t tfread(int64_t tfd, void *buf, int64_t count) { int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd); void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1; if (p == NULL) return -1;
...@@ -79,3 +92,32 @@ int64_t tfread(int64_t tfd, void *buf, int64_t count) { ...@@ -79,3 +92,32 @@ int64_t tfread(int64_t tfd, void *buf, int64_t count) {
taosReleaseRef(tsFileRsetId, tfd); taosReleaseRef(tsFileRsetId, tfd);
return ret; return ret;
} }
int64_t tfFsync(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return fsync(fd);
}
bool tfValid(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
return p != NULL;
}
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return taosLSeek(fd, offset, whence);
}
int32_t tfFtruncate(int64_t tfd, int64_t length) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return taosFtruncate(fd, length);
}
...@@ -44,8 +44,8 @@ typedef struct { ...@@ -44,8 +44,8 @@ typedef struct {
uint64_t version; uint64_t version;
int64_t fileId; int64_t fileId;
int64_t rid; int64_t rid;
int64_t tfd;
int32_t vgId; int32_t vgId;
int32_t fd;
int32_t keep; int32_t keep;
int32_t level; int32_t level;
int32_t fsyncPeriod; int32_t fsyncPeriod;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tref.h" #include "tref.h"
#include "tfile.h"
#include "twal.h" #include "twal.h"
#include "walInt.h" #include "walInt.h"
...@@ -61,7 +62,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { ...@@ -61,7 +62,7 @@ void *walOpen(char *path, SWalCfg *pCfg) {
} }
pWal->vgId = pCfg->vgId; pWal->vgId = pCfg->vgId;
pWal->fd = -1; pWal->tfd = -1;
pWal->fileId = -1; pWal->fileId = -1;
pWal->level = pCfg->walLevel; pWal->level = pCfg->walLevel;
pWal->keep = pCfg->keep; pWal->keep = pCfg->keep;
...@@ -124,7 +125,7 @@ void walClose(void *handle) { ...@@ -124,7 +125,7 @@ void walClose(void *handle) {
SWal *pWal = handle; SWal *pWal = handle;
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
taosClose(pWal->fd); tfClose(pWal->tfd);
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refId, pWal->rid); taosRemoveRef(tsWal.refId, pWal->rid);
} }
...@@ -143,7 +144,7 @@ static void walFreeObj(void *wal) { ...@@ -143,7 +144,7 @@ static void walFreeObj(void *wal) {
SWal *pWal = wal; SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
taosClose(pWal->fd); tfClose(pWal->tfd);
pthread_mutex_destroy(&pWal->mutex); pthread_mutex_destroy(&pWal->mutex);
tfree(pWal); tfree(pWal);
} }
...@@ -172,7 +173,7 @@ static void walFsyncAll() { ...@@ -172,7 +173,7 @@ static void walFsyncAll() {
while (pWal) { while (pWal) {
if (walNeedFsync(pWal)) { if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
int32_t code = fsync(pWal->fd); int32_t code = tfFsync(pWal->tfd);
if (code != 0) { if (code != 0) {
wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tfile.h"
#include "twal.h" #include "twal.h"
#include "walInt.h" #include "walInt.h"
...@@ -36,8 +37,8 @@ int32_t walRenew(void *handle) { ...@@ -36,8 +37,8 @@ int32_t walRenew(void *handle) {
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >= 0) { if (tfValid(pWal->tfd)) {
tclose(pWal->fd); tfClose(pWal->tfd);
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
} }
...@@ -49,9 +50,9 @@ int32_t walRenew(void *handle) { ...@@ -49,9 +50,9 @@ int32_t walRenew(void *handle) {
} }
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) { if (!tfValid(pWal->tfd)) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
} else { } else {
...@@ -67,7 +68,7 @@ void walRemoveOneOldFile(void *handle) { ...@@ -67,7 +68,7 @@ void walRemoveOneOldFile(void *handle) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal == NULL) return; if (pWal == NULL) return;
if (pWal->keep == TAOS_WAL_KEEP) return; if (pWal->keep == TAOS_WAL_KEEP) return;
if (pWal->fd <= 0) return; if (!tfValid(pWal->tfd)) return;
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
...@@ -113,7 +114,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -113,7 +114,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
int32_t code = 0; int32_t code = 0;
// no wal // no wal
if (pWal->fd <= 0) return 0; if (!tfValid(pWal->tfd)) return 0;
if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->version) return 0; if (pHead->version <= pWal->version) return 0;
...@@ -123,12 +124,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -123,12 +124,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (taosWrite(pWal->fd, pHead, contLen) != contLen) { if (tfWrite(pWal->tfd, pHead, contLen) != contLen) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
} else { } else {
wTrace("vgId:%d, write wal, fileId:%" PRId64 " fd:%d hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId, wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,
pWal->fileId, pWal->fd, pHead->version, pWal->version, pHead->len); pWal->fileId, pWal->tfd, pHead->version, pWal->version, pHead->len);
pWal->version = pHead->version; pWal->version = pHead->version;
} }
...@@ -141,11 +142,11 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -141,11 +142,11 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
void walFsync(void *handle, bool forceFsync) { void walFsync(void *handle, bool forceFsync) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal == NULL || pWal->fd < 0) return; if (pWal == NULL || !tfValid(pWal->tfd)) return;
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId); wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId);
if (fsync(pWal->fd) < 0) { if (tfFsync(pWal->tfd) < 0) {
wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno)); wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno));
} }
} }
...@@ -186,8 +187,8 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { ...@@ -186,8 +187,8 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
// open the existing WAL file in append mode // open the existing WAL file in append mode
pWal->fileId = 0; pWal->fileId = 0;
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) { if (!tfValid(pWal->tfd)) {
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
...@@ -217,22 +218,22 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { ...@@ -217,22 +218,22 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
return code; return code;
} }
static void walFtruncate(SWal *pWal, int32_t fd, int64_t offset) { static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) {
taosFtruncate(fd, offset); tfFtruncate(tfd, offset);
fsync(fd); tfFsync(tfd);
} }
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, int64_t *offset) { static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
int64_t pos = *offset; int64_t pos = *offset;
while (1) { while (1) {
pos++; pos++;
if (lseek(fd, pos, SEEK_SET) < 0) { if (tfLseek(tfd, pos, SEEK_SET) < 0) {
wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno)); wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED; return TSDB_CODE_WAL_FILE_CORRUPTED;
} }
if (taosRead(fd, pHead, sizeof(SWalHead)) <= 0) { if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED; return TSDB_CODE_WAL_FILE_CORRUPTED;
} }
...@@ -259,8 +260,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -259,8 +260,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
int32_t fd = open(name, O_RDWR); int64_t tfd = tfOpen(name, O_RDWR);
if (fd < 0) { if (!tfValid(tfd)) {
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
tfree(buffer); tfree(buffer);
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
...@@ -273,7 +274,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -273,7 +274,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
SWalHead *pHead = buffer; SWalHead *pHead = buffer;
while (1) { while (1) {
int32_t ret = taosRead(fd, pHead, sizeof(SWalHead)); int32_t ret = tfRead(tfd, pHead, sizeof(SWalHead));
if (ret == 0) break; if (ret == 0) break;
if (ret < 0) { if (ret < 0) {
...@@ -284,16 +285,16 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -284,16 +285,16 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (ret < sizeof(SWalHead)) { if (ret < sizeof(SWalHead)) {
wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret); wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
walFtruncate(pWal, fd, offset); walFtruncate(pWal, tfd, offset);
break; break;
} }
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset); pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, fd, &offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, fd, offset); walFtruncate(pWal, tfd, offset);
break; break;
} }
} }
...@@ -310,7 +311,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -310,7 +311,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
pHead = buffer; pHead = buffer;
} }
ret = taosRead(fd, pHead->cont, pHead->len); ret = tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) { if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
...@@ -332,7 +333,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -332,7 +333,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
} }
tclose(fd); tfClose(tfd);
tfree(buffer); tfree(buffer);
return code; return code;
......
import taos
import datetime
import random
import multiprocessing
def taos_excute(table, connect_host):
conn = taos.connect(host=connect_host, user="root", password="taosdata", config="/etc/taos", database='test')
cursor = conn.cursor()
for i in range(1000000):
pk = random.randint(100001, 300000)
time_now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
col1 = random.randint(1, 10000)
col2 = random.randint(1, 10000)
col3 = random.randint(1, 10000)
col4 = random.randint(1, 10000)
col5 = random.randint(1, 10000)
col6 = random.randint(1, 10000)
sql = f"INSERT INTO {table}_{pk} USING {table} TAGS ({pk}) VALUES ('{time_now}', {col1}, {col2}, {col3}, {col4}, {col5}, {col6})"
cursor.execute(sql)
cursor.close()
conn.close()
def taos_init(table, connect_host, pk):
conn = taos.connect(host=connect_host, user="root", password="taosdata", config="/etc/taos", database='test')
cursor = conn.cursor()
sql = f"CREATE TABLE {table}_{pk} USING {table} TAGS ({pk})"
cursor.execute(sql)
cursor.close()
conn.close()
print("init time:", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
connect_list = ["node1", "node2", "node3", "node4", "node5"]
pool = multiprocessing.Pool(processes=108)
for pk in range(100001, 300000):
pool.apply_async(func=taos_init, args=("test", connect_list[pk % 5], pk, ))
print("start time:", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
for i in range(10000):
pool.apply_async(func=taos_excute, args=("test", connect_list[i % 5],))
pool.close()
pool.join()
print("end time:", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
import time
import datetime
import csv
import random
import pandas as pd
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1500074556514
def writeCSV(self):
with open('test3.csv','w', encoding='utf-8', newline='') as csvFile:
writer = csv.writer(csvFile, dialect='excel')
for i in range(1000000):
newTimestamp = self.ts + random.randint(10000000, 10000000000) + random.randint(1000, 10000000) + random.randint(1, 1000)
d = datetime.datetime.fromtimestamp(newTimestamp / 1000)
dt = str(d.strftime("%Y-%m-%d %H:%M:%S.%f"))
writer.writerow(["'%s'" % dt, random.randint(1, 100), random.uniform(1, 100), random.randint(1, 100), random.randint(1, 100)])
def removCSVHeader(self):
data = pd.read_csv("ordered.csv")
data = data.drop([0])
data.to_csv("ordered.csv", header = False, index = False)
def run(self):
tdSql.prepare()
tdSql.execute("create table t1(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
startTime = time.time()
tdSql.execute("insert into t1 file 'outoforder.csv'")
duration = time.time() - startTime
print("Out of Order - Insert time: %d" % duration)
tdSql.query("select count(*) from t1")
rows = tdSql.getData(0, 0)
tdSql.execute("create table t2(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
startTime = time.time()
tdSql.execute("insert into t2 file 'ordered.csv'")
duration = time.time() - startTime
print("Ordered - Insert time: %d" % duration)
tdSql.query("select count(*) from t2")
tdSql.checkData(0,0, rows)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -117,16 +117,17 @@ run general/parser/col_arithmetic_query.sim ...@@ -117,16 +117,17 @@ run general/parser/col_arithmetic_query.sim
# ================================================================================================ # ================================================================================================
print ====================> crash print ====================> crash
# sql select spread(ts )/(1000*3600*24) from ca_stb0 interval(1y) sql use $db
sql select spread(ts )/(1000*3600*24) from $stb interval(1y)
sql_error select first(c1, c2) - last(c1, c2) from stb interval(1y)
sql_error select first(ts) - last(ts) from stb interval(1y) sql_error select first(c1, c2) - last(c1, c2) from $stb interval(1y)
sql_error select top(c1, 2) - last(c1) from stb; sql_error select first(ts) - last(ts) from $stb interval(1y)
sql_error select stddev(c1) - last(c1) from stb; sql_error select top(c1, 2) - last(c1) from $stb;
sql_error select diff(c1) - last(c1) from stb; sql_error select stddev(c1) - last(c1) from $stb;
sql_error select first(c7) - last(c7) from stb; sql_error select diff(c1) - last(c1) from $stb;
sql_error select first(c8) - last(c8) from stb; sql_error select first(c7) - last(c7) from $stb;
sql_error select first(c9) - last(c9) from stb; sql_error select first(c8) - last(c8) from $stb;
sql_error select first(c9) - last(c9) from $stb;
sql_error select max(c2*2) from $tb sql_error select max(c2*2) from $tb
sql_error select max(c1-c2) from $tb sql_error select max(c1-c2) from $tb
......
...@@ -62,24 +62,73 @@ if $data91 != 1.000000000 then ...@@ -62,24 +62,73 @@ if $data91 != 1.000000000 then
return -1 return -1
endi endi
sql select (c1 * 2) % 7.9 from $tb order by ts desc; sql select (c1 * 2) % 7.9, c1*1, c1*1*1, c1*c1, c1*c1*c1 from $tb order by ts desc;
if $rows != 10000 then if $rows != 10000 then
return -1 return -1
endi endi
if $data00 != 0.100000000 then if $data00 != 2.200000000 then
print expect 0.100000000, acutal:$data00 print expect 2.200000000, actual:$data00
return -1 return -1
endi endi
if $data10 != 2.100000000 then if $data01 != 9.000000000 then
return -1 return -1
endi endi
if $data90 != 6.000000000 then if $data02 != 9.000000000 then
return -1
endi
if $data03 != 81.000000000 then
return -1
endi
if $data04 != 729.000000000 then
return -1
endi
if $data10 != 0.200000000 then
return -1
endi
if $data11 != 8.000000000 then
return -1
endi
if $data12 != 8.000000000 then
return -1
endi
if $data13 != 64.000000000 then
return -1
endi
if $data14 != 512.000000000 then
return -1
endi
if $data90 != 0.000000000 then
return -1 return -1
endi endi
if $data91 != 0.000000000 then
return -1
endi
if $data92 != 0.000000000 then
return -1
endi
if $data93 != 0.000000000 then
return -1
endi
if $data94 != 0.000000000 then
return -1
endi
# [d.3] # [d.3]
sql select c1 * c2 /4 from $tb where ts < 1537166000000 and ts > 1537156000000 sql select c1 * c2 /4 from $tb where ts < 1537166000000 and ts > 1537156000000
if $rows != 17 then if $rows != 17 then
...@@ -95,7 +144,7 @@ if $data10 != 16.000000000 then ...@@ -95,7 +144,7 @@ if $data10 != 16.000000000 then
endi endi
if $data20 != 20.250000000 then if $data20 != 20.250000000 then
print expect 20.250000000, acutal:$data21 print expect 20.250000000, actual:$data21
return -1 return -1
endi endi
...@@ -320,7 +369,9 @@ sql_error select c7-c9 from $tb interval(2y) ...@@ -320,7 +369,9 @@ sql_error select c7-c9 from $tb interval(2y)
# multiple retrieve [d.20]=============================================================== # multiple retrieve [d.20]===============================================================
sql select c2-c2, 911 from $tb sql select c2-c2, 911 from $tb
#======================================= aggregation function arithmetic query cases ================ #======================================= aggregation function arithmetic query cases ===================================
# on $tb percentile() spread(ts) bug
# asc/desc order [d.2] # asc/desc order [d.2]
sql select first(c1) * ( 2 / 3 ) from $stb order by ts asc; sql select first(c1) * ( 2 / 3 ) from $stb order by ts asc;
if $rows != 1 then if $rows != 1 then
...@@ -349,11 +400,11 @@ if $data00 != 1.800000000 then ...@@ -349,11 +400,11 @@ if $data00 != 1.800000000 then
return -1 return -1
endi endi
if $data01 != 100000 then if $data01 != 100000.000000000 then
return -1 return -1
endi endi
if $data02 != 200000 then if $data02 != 200000.000000000 then
return -1 return -1
endi endi
...@@ -374,77 +425,192 @@ if $data02 != 9.000000020 then ...@@ -374,77 +425,192 @@ if $data02 != 9.000000020 then
return -1 return -1
endi endi
# all possible function in the arithmetic expressioin # all possible function in the arithmetic expression, add more
sql select min(c1) * max(c2) /4, sum(c1) * percentile(c2, 20), apercentile(c4, 33) + 52/9, spread(c5)/min(c2) from $stb where ts < and ts > sql select min(c1) * max(c2) /4, sum(c1) * apercentile(c2, 20), apercentile(c4, 33) + 52/9, spread(c5)/min(c2), count(1)/sum(c1), avg(c2)*count(c2) from $stb where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-11-25 19:30:00.000';
if $rows != 1 then
return -1
endi
# no result return [d.3] if $data00 != 0.000000000 then
return -1
endi
if $data01 != 225000.000000000 then
return -1
endi
if $data02 != 8.077777778 then
return -1
endi
if $data03 != inf then
return -1
endi
if $data04 != 0.444444444 then
return -1
endi
if $data05 != 450000.000000000 then
return -1
endi
# no result return [d.3]===============================================================
sql select first(c1) * 91 - 7, last(c3) from $stb where ts < 1537146000000 sql select first(c1) * 91 - 7, last(c3) from $stb where ts < 1537146000000
if $rows != 0 then if $rows != 0 then
return -1 return -1
endi endi
# no result return [d.3] # no result return [d.3]
sql select sum(c2) - avg(c2) from $tb where ts>xxx sql select sum(c2) - avg(c2) from $stb where ts > '2018-11-25 19:30:00.000'
if $rows != 0 then if $rows != 0 then
return -1 return -1
endi endi
# single row result aggregation [d.4] # single row result aggregation [d.4]===================================================
sql select # all those cases are aggregation test cases.
# error cases # error cases
sql_error select first(c1, c2) - last(c1, c2) from $tb sql_error select first(c1, c2) - last(c1, c2) from $stb
sql_error select top(c1, 5) - bottom(c1, 5) from $stb
sql_error select first(*) - 99 from $stb
# multi row result aggregation [d.4] # multi row result aggregation [d.4]
sql select top(c1, 1) - bottom(c1, 1) from $tb sql_error select top(c1, 1) - bottom(c1, 1) from $stb
sql select top(c1, 99) - bottom(c1, 99) from $tb sql_error select top(c1, 99) - bottom(c1, 99) from $stb
# query on super table [d.5]=============================================================
# all cases in this part are query on super table
# all data types [d.6] # all data types [d.6]===================================================================
sql select c2-c1, c3/c2, c4*c3, c5%c4, c6+99%22 from $tb sql select c2-c1, c3/c2, c4*c3, c5%c4, c6+99%22 from $stb
# error case, ts/bool/binary/nchar not support arithmetic expression # error case, ts/bool/binary/nchar not support arithmetic expression
sql_error select ts+ts from $tb sql_error select first(c7)*12 from $stb
sql_error select ts+22 from $tb sql_error select last(c8)/55 from $stb
sql_error select c7*12 from $tb sql_error select last_row(c9) + last_row(c8) from $stb
sql_error select c8/55 from $tb
sql_error select c9+c8 from $tb
# arithmetic expression in join [d.7] # arithmetic expression in join [d.7]===============================================================
# arithmetic expression in union [d.8] # arithmetic expression in union [d.8]===============================================================
# arithmetic expression in group by [d.9] # arithmetic expression in group by [d.9]===============================================================
# in group by tag # in group by tag
# not support for normal table sql select avg(c4)*99 from $stb group by t1
sql_error select c5*99 from $tb group by t1 if $rows != 10 then
return -1
endi
if $data00 != 445.500000000 then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data90 != 445.500000000 then
return -1
endi
if $data91 != 9 then
return -1
endi
# in group by column # in group by column
sql_error select c6-c6+c3*12 from $tb group by c3; sql select apercentile(c6, 50)-first(c6)+last(c5)*12, last(c5)*12 from ca_stb0 group by c2;
if $rows != 10 then
return -1
endi
if $data00 != 0.000000000 then
return -1
endi
sql select first(c6) - last(c6) *12 / count(*) from $tb group by c3; if $data01 != 0.000000000 then
return -1
endi
# limit offset [d.10] if $data10 != 12.000000000 then
sql select c6-c6+12 from $tb limit 12 offset 99; return -1
sql select c4/99.123 from $tb limit 1 offset 9999; endi
# slimit/soffset not suport for normal table query. [d.11] if $data11 != 12.000000000 then
sql_error select sum(c1) from $tb slimit 1 soffset 19; return -1
endi
if $data20 != 24.000000000 then
return -1
endi
if $data21 != 24.000000000 then
return -1
endi
sql_error select first(c6) - last(c6) *12 / count(*) from $stb group by c3;
sql select first(c6) - last(c6) *12 / count(*) from $stb group by c5;
if $rows != 10 then
return -1
endi
if $data00 != 0.000000000 then
return -1
endi
if $data10 != 0.997600000 then
return -1
endi
if $data90 != 8.978400000 then
return -1
endi
# limit offset [d.10]===============================================================
sql select first(c6) - sum(c6) + 12 from $stb limit 12 offset 0;
if $rows != 1 then
return -1
endi
if $data00 != -449988.000000000 then
return -1
endi
sql select apercentile(c4, 21) / 99.123 from $stb limit 1 offset 1;
if $rows != 0 then
return -1
endi
sql select apercentile(c4, 21) / sum(c4) from $stb interval(1s) limit 1 offset 1;
if $rows != 1 then
return -1
endi
# slimit/soffset not support for normal table query. [d.11]===============================================================
sql select sum(c1) from $stb slimit 1 soffset 19;
if $rows != 0 then
return -1
endi
# fill [d.12] sql select sum(c1) from $stb interval(1s) group by tbname slimit 1 soffset 1
sql_error select c2-c2, c3-c4, c5%c6 from $tb fill(value, 12); sql select sum(c1) from ca_stb0 interval(1s) group by tbname slimit 2 soffset 4 limit 10 offset 1
# constant column. [d.13] # fill [d.12]===============================================================
sql_error select first(c1)-last(c1), sum(c3)*count(c3), spread(c5 ) % count(*) from $stb interval(1s) fill(prev);
sql_error select first(c1) from $stb fill(value, 20);
# constant column. [d.13]===============================================================
# column value filter [d.14]
# column value filter [d.14]===============================================================
# tag filter(not support for normal table). [d.15]
sql_error select sum(c2)+99 from $tb where t1=12;
# multi-field output [d.16] # tag filter. [d.15]===============================================================
sql select sum(c2)+99 from $stb where t1=12;
# multi-field output [d.16]===============================================================
sql select count(*), sum(c1)*avg(c2), avg(c3)*count(c3), sum(c3), sum(c4), first(c7), last(c8), first(c9), first(c7), last(c8) from $tb sql select count(*), sum(c1)*avg(c2), avg(c3)*count(c3), sum(c3), sum(c4), first(c7), last(c8), first(c9), first(c7), last(c8) from $tb
sql select c4*1+1/2 from $tb sql select c4*1+1/2 from $tb
...@@ -461,18 +627,30 @@ if $data90 != 9.500000000 then ...@@ -461,18 +627,30 @@ if $data90 != 9.500000000 then
return -1 return -1
endi endi
# interval query [d.17] # interval query [d.17]===============================================================
sql_error select c2*c2, c3-c3, c4+9 from $tb interval(1s) sql select avg(c2)*count(c2), sum(c3)-first(c3), last(c4)+9 from $stb interval(1s)
sql_error select c7-c9 from $tb interval(2y) if $rows != 10000 then
return -1
endi
# aggregation query [d.18] if $data00 != @18-09-17 09:00:00.000@ then
# see test cases below return -1
endi
# first/last query [d.19] sql_error select first(c7)- last(c1) from $tb interval(2y)
# see test cases below
# multiple retrieve [d.20] # aggregation query [d.18]===============================================================
sql select c2-c2 from $tb; # all cases in this part are aggregation query test.
# first/last query [d.19]===============================================================
# multiple retrieve [d.20]===============================================================
sql select c2-c2 from $tb
sql select first(c1)-last(c1), spread(c2), max(c3) - min(c3), avg(c4)*count(c4) from $tb sql select first(c1)-last(c1), spread(c2), max(c3) - min(c3), avg(c4)*count(c4) from $tb
#====================================================super table query==================================================
...@@ -703,13 +703,13 @@ sql select twa(c1), twa(c2), twa(c3), twa(c4), twa(c5), twa(c6) from $tb where t ...@@ -703,13 +703,13 @@ sql select twa(c1), twa(c2), twa(c3), twa(c4), twa(c5), twa(c6) from $tb where t
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 4.499549955 then if $data00 != 4.500000000 then
return -1 return -1
endi endi
if $data02 != 4.499549955 then if $data02 != 4.500000000 then
return -1 return -1
endi endi
if $data05 != 4.499549955 then if $data05 != 4.500000000 then
return -1 return -1
endi endi
...@@ -717,10 +717,12 @@ sql select first(c1), first(c2), first(c3), first(c4), first(c5), first(c6) from ...@@ -717,10 +717,12 @@ sql select first(c1), first(c2), first(c3), first(c4), first(c5), first(c6) from
if $rows != 0 then if $rows != 0 then
return -1 return -1
endi endi
sql select first(c1), first(c2), first(c3), first(c4), first(c5), first(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 3 offset 1 sql select first(c1), first(c2), first(c3), first(c4), first(c5), first(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 3 offset 1
if $rows != 3 then if $rows != 3 then
return -1 return -1
endi endi
if $data01 != 3 then if $data01 != 3 then
return -1 return -1
endi endi
...@@ -731,7 +733,6 @@ if $data23 != 9.00000 then ...@@ -731,7 +733,6 @@ if $data23 != 9.00000 then
return -1 return -1
endi endi
sql select last(c1), last(c2), last(c3), last(c4), last(c5), last(c6) from $tb where ts >= $ts0 and ts <= $tsu limit 5 offset 1 sql select last(c1), last(c2), last(c3), last(c4), last(c5), last(c6) from $tb where ts >= $ts0 and ts <= $tsu limit 5 offset 1
if $rows != 0 then if $rows != 0 then
return -1 return -1
......
...@@ -327,22 +327,22 @@ sql select twa(c1), twa(c2), twa(c3), twa(c4), twa(c5), twa(c6) from $tb where t ...@@ -327,22 +327,22 @@ sql select twa(c1), twa(c2), twa(c3), twa(c4), twa(c5), twa(c6) from $tb where t
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 4.000000000 then if $data00 != 4.500000000 then
return -1 return -1
endi endi
if $data01 != 4.000000000 then if $data01 != 4.500000000 then
return -1 return -1
endi endi
if $data02 != 4.000000000 then if $data02 != 4.500000000 then
return -1 return -1
endi endi
if $data03 != 4.000000000 then if $data03 != 4.500000000 then
return -1 return -1
endi endi
if $data04 != 4.000000000 then if $data04 != 4.500000000 then
return -1 return -1
endi endi
if $data05 != 4.000000000 then if $data05 != 4.500000000 then
return -1 return -1
endi endi
...@@ -690,13 +690,13 @@ sql select twa(c1), twa(c2), twa(c3), twa(c4), twa(c5), twa(c6) from $tb where t ...@@ -690,13 +690,13 @@ sql select twa(c1), twa(c2), twa(c3), twa(c4), twa(c5), twa(c6) from $tb where t
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 4.000000000 then if $data00 != 4.500000000 then
return -1 return -1
endi endi
if $data02 != 4.000000000 then if $data02 != 4.500000000 then
return -1 return -1
endi endi
if $data05 != 4.000000000 then if $data05 != 4.500000000 then
return -1 return -1
endi endi
......
...@@ -131,7 +131,6 @@ if $data00 != $rowNum then ...@@ -131,7 +131,6 @@ if $data00 != $rowNum then
return -1 return -1
endi endi
## like ## like
sql_error select * from $mt where c1 like 1 sql_error select * from $mt where c1 like 1
#sql_error select * from $mt where t1 like 1 #sql_error select * from $mt where t1 like 1
...@@ -178,7 +177,8 @@ sql create table wh_mt2_tb1 using wh_mt2 tags ('wh_mt2_tb1') ...@@ -178,7 +177,8 @@ sql create table wh_mt2_tb1 using wh_mt2 tags ('wh_mt2_tb1')
# 2019-01-01 09:00:00.000 1546304400000 # 2019-01-01 09:00:00.000 1546304400000
# 2019-01-01 09:10:00.000 1546305000000 # 2019-01-01 09:10:00.000 1546305000000
sql insert into wh_mt2_tb1 values ('2019-01-01 00:00:00.000', '2019-01-01 09:00:00.000', 'binary10', 'nchar10') sql insert into wh_mt2_tb1 values ('2019-01-01 00:00:00.000', '2019-01-01 09:00:00.000', 'binary10', 'nchar10')
sql insert into wh_mt2_tb1 values ('2019-01-01 00:10:00.000', '2019-01-01 09:10:00.000', 'binary10', 'nchar10') sql insert into wh_mt2_tb1 values ('2019-01-01 00:10:00.000', '2019-01-01 09:10:00.000', 'binary10', 'nchar10')
sql select * from wh_mt2_tb1 where c1 > 1546304400000 sql select * from wh_mt2_tb1 where c1 > 1546304400000
if $rows != 1 then if $rows != 1 then
return -1 return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册