提交 4d766449 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch '3.0' into docs/sangshuduo/update-readme

...@@ -6,39 +6,32 @@ import java.sql.Connection; ...@@ -6,39 +6,32 @@ import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
public class StmtInsertExample { public class StmtInsertExample {
private static ArrayList<Long> tsToLongArray(String ts) { private static String datePattern = "yyyy-MM-dd HH:mm:ss.SSS";
ArrayList<Long> result = new ArrayList<>(); private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
LocalDateTime localDateTime = LocalDateTime.parse(ts, formatter);
result.add(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
return result;
}
private static <T> ArrayList<T> toArray(T v) {
ArrayList<T> result = new ArrayList<>();
result.add(v);
return result;
}
private static List<String> getRawData() { private static List<String> getRawData(int size) {
return Arrays.asList( SimpleDateFormat format = new SimpleDateFormat(datePattern);
"d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,California.SanFrancisco,2", List<String> result = new ArrayList<>();
"d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,California.SanFrancisco,2", long current = System.currentTimeMillis();
"d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,California.SanFrancisco,2", Random random = new Random();
"d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,California.SanFrancisco,3", for (int i = 0; i < size; i++) {
"d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,California.LosAngeles,2", String time = format.format(current + i);
"d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,California.LosAngeles,2", int id = random.nextInt(10);
"d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,California.LosAngeles,3", result.add("d" + id + "," + time + ",10.30000,219,0.31000,California.SanFrancisco,2");
"d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,California.LosAngeles,3" }
); return result.stream()
.sorted(Comparator.comparing(s -> s.split(",")[0])).collect(Collectors.toList());
} }
private static Connection getConnection() throws SQLException { private static Connection getConnection() throws SQLException {
...@@ -48,9 +41,9 @@ public class StmtInsertExample { ...@@ -48,9 +41,9 @@ public class StmtInsertExample {
private static void createTable(Connection conn) throws SQLException { private static void createTable(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) { try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE power KEEP 3650"); stmt.execute("CREATE DATABASE if not exists power KEEP 3650");
stmt.executeUpdate("USE power"); stmt.executeUpdate("use power");
stmt.execute("CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " + stmt.execute("CREATE STABLE if not exists meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
"TAGS (location BINARY(64), groupId INT)"); "TAGS (location BINARY(64), groupId INT)");
} }
} }
...@@ -58,21 +51,54 @@ public class StmtInsertExample { ...@@ -58,21 +51,54 @@ public class StmtInsertExample {
private static void insertData() throws SQLException { private static void insertData() throws SQLException {
try (Connection conn = getConnection()) { try (Connection conn = getConnection()) {
createTable(conn); createTable(conn);
String psql = "INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)"; String psql = "INSERT INTO ? USING power.meters TAGS(?, ?) VALUES(?, ?, ?, ?)";
try (TSDBPreparedStatement pst = (TSDBPreparedStatement) conn.prepareStatement(psql)) { try (TSDBPreparedStatement pst = (TSDBPreparedStatement) conn.prepareStatement(psql)) {
for (String line : getRawData()) { String tableName = null;
ArrayList<Long> ts = new ArrayList<>();
ArrayList<Float> current = new ArrayList<>();
ArrayList<Integer> voltage = new ArrayList<>();
ArrayList<Float> phase = new ArrayList<>();
for (String line : getRawData(100000)) {
String[] ps = line.split(","); String[] ps = line.split(",");
// bind table name and tags if (tableName == null) {
pst.setTableName(ps[0]); // bind table name and tags
pst.setTagString(0, ps[5]); tableName = "power." + ps[0];
pst.setTagInt(1, Integer.valueOf(ps[6])); pst.setTableName(ps[0]);
pst.setTagString(0, ps[5]);
pst.setTagInt(1, Integer.valueOf(ps[6]));
} else {
if (!tableName.equals(ps[0])) {
pst.setTimestamp(0, ts);
pst.setFloat(1, current);
pst.setInt(2, voltage);
pst.setFloat(3, phase);
pst.columnDataAddBatch();
pst.columnDataExecuteBatch();
// bind table name and tags
tableName = ps[0];
pst.setTableName(ps[0]);
pst.setTagString(0, ps[5]);
pst.setTagInt(1, Integer.valueOf(ps[6]));
ts.clear();
current.clear();
voltage.clear();
phase.clear();
}
}
// bind values // bind values
pst.setTimestamp(0, tsToLongArray(ps[1])); //ps[1] looks like: 2018-10-03 14:38:05.000 // ps[1] looks like: 2018-10-03 14:38:05.000
pst.setFloat(1, toArray(Float.valueOf(ps[2]))); LocalDateTime localDateTime = LocalDateTime.parse(ps[1], formatter);
pst.setInt(2, toArray(Integer.valueOf(ps[3]))); ts.add(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
pst.setFloat(3, toArray(Float.valueOf(ps[4]))); current.add(Float.valueOf(ps[2]));
pst.columnDataAddBatch(); voltage.add(Integer.valueOf(ps[3]));
phase.add(Float.valueOf(ps[4]));
} }
pst.setTimestamp(0, ts);
pst.setFloat(1, current);
pst.setInt(2, voltage);
pst.setFloat(3, phase);
pst.columnDataAddBatch();
pst.columnDataExecuteBatch(); pst.columnDataExecuteBatch();
} }
} }
......
...@@ -429,7 +429,7 @@ if [ "$exitcode" != "0" ]; then ...@@ -429,7 +429,7 @@ if [ "$exitcode" != "0" ]; then
exit $exitcode exit $exitcode
fi fi
if [ -n "${taostools_bin_files}" ]; then if [ -n "${taostools_bin_files}" ] && [ "$verMode" != "cloud" ]; then
wget https://github.com/taosdata/grafanaplugin/releases/latest/download/TDinsight.sh -O ${taostools_install_dir}/bin/TDinsight.sh && echo "TDinsight.sh downloaded!"|| echo "failed to download TDinsight.sh" wget https://github.com/taosdata/grafanaplugin/releases/latest/download/TDinsight.sh -O ${taostools_install_dir}/bin/TDinsight.sh && echo "TDinsight.sh downloaded!"|| echo "failed to download TDinsight.sh"
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
tar -zcv -f "$(basename ${taostools_pkg_name}).tar.gz" "$(basename ${taostools_install_dir})" --remove-files || : tar -zcv -f "$(basename ${taostools_pkg_name}).tar.gz" "$(basename ${taostools_install_dir})" --remove-files || :
......
...@@ -1970,7 +1970,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1970,7 +1970,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s %d|", flag, j); len += snprintf(dumpBuf + len, size - len, "%s|", flag);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
for (int32_t k = 0; k < colNum; k++) { for (int32_t k = 0; k < colNum; k++) {
...@@ -2053,7 +2053,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -2053,7 +2053,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
} break; } break;
} }
} }
len += snprintf(dumpBuf + len, size - len, "\n"); len += snprintf(dumpBuf + len, size - len, "%d\n", j);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
} }
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag); len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
......
...@@ -404,16 +404,47 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -404,16 +404,47 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
if (!COL_VAL_IS_NONE(pColVal)) { // if (!COL_VAL_IS_NONE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen);
pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
taosMemoryFree(value);
}
if (COL_VAL_IS_VALUE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
if (NULL == pLastCol || pLastCol->ts <= keyTs) { if (NULL == pLastCol || pLastCol->ts <= keyTs) {
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen); rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen);
pLastCol = (SLastCol *)value; pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
...@@ -434,39 +465,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -434,39 +465,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(value); taosMemoryFree(value);
} }
if (COL_VAL_IS_VALUE(pColVal)) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
char *value = NULL;
size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen);
pLastCol = (SLastCol *)value;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge,
tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
taosMemoryFree(value);
}
}
} }
//}
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]); rocksdb_free(values_list[i + num_keys]);
...@@ -474,7 +474,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -474,7 +474,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);
rocksMayWrite(pTsdb, false, false, false); rocksMayWrite(pTsdb, true, false, false);
taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
_exit: _exit:
...@@ -3010,17 +3010,17 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, ...@@ -3010,17 +3010,17 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
} }
if (COL_VAL_IS_NONE(pColVal)) { /*if (COL_VAL_IS_NONE(pColVal)) {
if (!setNoneCol) { if (!setNoneCol) {
noneCol = iCol; noneCol = iCol;
setNoneCol = true; setNoneCol = true;
} }
} else { } else {*/
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
if (aColIndex >= 0) { if (aColIndex >= 0) {
taosArrayRemove(aColArray, aColIndex); taosArrayRemove(aColArray, aColIndex);
}
} }
//}
} }
if (!setNoneCol) { if (!setNoneCol) {
// done, goto return pColArray // done, goto return pColArray
......
...@@ -343,6 +343,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -343,6 +343,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
hasRes = true; hasRes = true;
p->ts = pColVal->ts; p->ts = pColVal->ts;
if (k == 0) {
if (TARRAY_SIZE(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
}
if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
singleTableLastTs = pColVal->ts; singleTableLastTs = pColVal->ts;
} }
...@@ -373,12 +381,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -373,12 +381,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
} }
if (TARRAY_SIZE(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
// taosArrayClearEx(pRow, freeItem); // taosArrayClearEx(pRow, freeItem);
taosArrayClear(pRow); taosArrayClear(pRow);
} }
......
...@@ -448,15 +448,17 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx ...@@ -448,15 +448,17 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
return true; return true;
} }
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) { bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) { if (pInterval->interval != pInterval->sliding &&
((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart) )) {
return false; return false;
} }
return true; return true;
} }
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) { bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey); return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
} }
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
...@@ -1365,7 +1367,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa ...@@ -1365,7 +1367,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
} }
do { do {
if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) { if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i], pBlock->info.type)) {
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
continue; continue;
} }
......
...@@ -130,6 +130,18 @@ class TDDnode: ...@@ -130,6 +130,18 @@ class TDDnode:
"locale": "en_US.UTF-8", "locale": "en_US.UTF-8",
"charset": "UTF-8", "charset": "UTF-8",
"asyncLog": "0", "asyncLog": "0",
"mDebugFlag": "143",
"dDebugFlag": "143",
"vDebugFlag": "143",
"tqDebugFlag": "143",
"cDebugFlag": "143",
"jniDebugFlag": "143",
"qDebugFlag": "143",
"rpcDebugFlag": "143",
"tmrDebugFlag": "131",
"uDebugFlag": "143",
"sDebugFlag": "143",
"wDebugFlag": "143",
"numOfLogLines": "100000000", "numOfLogLines": "100000000",
"statusInterval": "1", "statusInterval": "1",
"enableQueryHb": "1", "enableQueryHb": "1",
......
/home/ubuntu/Documents/github/cadem/emptydebug/forcedrop/
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册