提交 d19b81c9 编写于 作者: S Shuduo Sang

Merge branch 'develop' into feature/sangshuduo/TD-4068-taosdemo-stmt

......@@ -8,6 +8,8 @@ import org.junit.Test;
import java.io.IOException;
import java.io.Serializable;
import java.sql.*;
import java.util.ArrayList;
import java.util.Random;
public class TSDBPreparedStatementTest {
private static final String host = "127.0.0.1";
......@@ -97,6 +99,118 @@ public class TSDBPreparedStatementTest {
Assert.assertEquals(1, result);
}
@Test
public void executeTest() throws SQLException {
Statement stmt = conn.createStatement();
int numOfRows = 1000;
for (int loop = 0; loop < 10; loop++){
stmt.execute("drop table if exists weather_test");
stmt.execute("create table weather_test(ts timestamp, f1 nchar(4), f2 float, f3 double, f4 timestamp, f5 int, f6 bool, f7 binary(10))");
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? values(?, ?, ?, ?, ?, ?, ?, ?)");
Random r = new Random();
s.setTableName("weather_test");
ArrayList<Long> ts = new ArrayList<Long>();
for(int i = 0; i < numOfRows; i++) {
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
int random = 10 + r.nextInt(5);
ArrayList<String> s2 = new ArrayList<String>();
for(int i = 0; i < numOfRows; i++) {
if(i % random == 0) {
s2.add(null);
}else{
s2.add("分支" + i % 4);
}
}
s.setNString(1, s2, 4);
random = 10 + r.nextInt(5);
ArrayList<Float> s3 = new ArrayList<Float>();
for(int i = 0; i < numOfRows; i++) {
if(i % random == 0) {
s3.add(null);
}else{
s3.add(r.nextFloat());
}
}
s.setFloat(2, s3);
random = 10 + r.nextInt(5);
ArrayList<Double> s4 = new ArrayList<Double>();
for(int i = 0; i < numOfRows; i++) {
if(i % random == 0) {
s4.add(null);
}else{
s4.add(r.nextDouble());
}
}
s.setDouble(3, s4);
random = 10 + r.nextInt(5);
ArrayList<Long> ts2 = new ArrayList<Long>();
for(int i = 0; i < numOfRows; i++) {
if(i % random == 0) {
ts2.add(null);
}else{
ts2.add(System.currentTimeMillis() + i);
}
}
s.setTimestamp(4, ts2);
random = 10 + r.nextInt(5);
ArrayList<Integer> vals = new ArrayList<>();
for(int i = 0; i < numOfRows; i++) {
if(i % random == 0) {
vals.add(null);
}else{
vals.add(r.nextInt());
}
}
s.setInt(5, vals);
random = 10 + r.nextInt(5);
ArrayList<Boolean> sb = new ArrayList<>();
for(int i = 0; i < numOfRows; i++) {
if(i % random == 0) {
sb.add(null);
}else{
sb.add(i % 2 == 0 ? true : false);
}
}
s.setBoolean(6, sb);
random = 10 + r.nextInt(5);
ArrayList<String> s5 = new ArrayList<String>();
for(int i = 0; i < numOfRows; i++) {
if(i % random == 0) {
s5.add(null);
}else{
s5.add("test" + i % 10);
}
}
s.setString(7, s5, 10);
s.columnDataAddBatch();
s.columnDataExecuteBatch();
s.columnDataCloseBatch();
String sql = "select * from weather_test";
PreparedStatement statement = conn.prepareStatement(sql);
ResultSet rs = statement.executeQuery();
int rows = 0;
while(rs.next()) {
rows++;
}
Assert.assertEquals(numOfRows, rows);
}
}
@Test
public void setBoolean() throws SQLException {
pstmt_insert.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
......
......@@ -28,8 +28,9 @@ typedef struct {
int bufBlockSize;
int tBufBlocks;
int nBufBlocks;
int nRecycleBlocks;
int64_t index;
SList* bufBlockList;
SList* bufBlockList;
} STsdbBufPool;
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
......@@ -39,5 +40,7 @@ void tsdbFreeBufPool(STsdbBufPool* pBufPool);
int tsdbOpenBufPool(STsdbRepo* pRepo);
void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode);
#endif /* _TD_TSDB_BUFFER_H_ */
\ No newline at end of file
......@@ -71,6 +71,11 @@ struct STsdbRepo {
uint8_t state;
STsdbCfg config;
STsdbCfg save_config; // save apply config
bool config_changed; // config changed flag
pthread_mutex_t save_mutex; // protect save config
STsdbAppH appH;
STsdbStat stat;
STsdbMeta* tsdbMeta;
......
......@@ -70,6 +70,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0;
pPool->index = 0;
pPool->nRecycleBlocks = 0;
for (int i = 0; i < pCfg->totalBlocks; i++) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
......@@ -156,4 +157,46 @@ _err:
return NULL;
}
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
\ No newline at end of file
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
int tsdbExpendPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
if (oldTotalBlocks == pRepo->config.totalBlocks) {
return TSDB_CODE_SUCCESS;
}
int err = TSDB_CODE_SUCCESS;
if (tsdbLockRepo(pRepo) < 0) return terrno;
STsdbBufPool* pPool = pRepo->pPool;
if (pRepo->config.totalBlocks > oldTotalBlocks) {
for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock == NULL) goto err;
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
tsdbFreeBufBlock(pBufBlock);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
err = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto err;
}
pPool->nBufBlocks++;
}
pthread_cond_signal(&pPool->poolNotEmpty);
} else {
pPool->nRecycleBlocks = oldTotalBlocks - pRepo->config.totalBlocks;
}
err:
tsdbUnlockRepo(pRepo);
return err;
}
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) {
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock);
free(pNode);
pPool->nBufBlocks--;
}
\ No newline at end of file
......@@ -112,6 +112,32 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) {
return 0;
}
static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
pRepo->config_changed = false;
STsdbCfg * pSaveCfg = &pRepo->save_config;
int32_t oldTotalBlocks = pRepo->config.totalBlocks;
pRepo->config.compression = pRepo->save_config.compression;
pRepo->config.keep = pRepo->save_config.keep;
pRepo->config.keep1 = pRepo->save_config.keep1;
pRepo->config.keep2 = pRepo->save_config.keep2;
pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow;
pRepo->config.totalBlocks = pRepo->save_config.totalBlocks;
tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d),totalBlocks(%d)",
REPO_ID(pRepo),
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
pSaveCfg->totalBlocks, pSaveCfg->cacheLastRow, pSaveCfg->totalBlocks);
int err = tsdbExpendPool(pRepo, oldTotalBlocks);
if (!TAOS_SUCCEEDED(err)) {
tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s",
REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err));
}
}
static void *tsdbLoopCommit(void *arg) {
SCommitQueue *pQueue = &tsCommitQueue;
SListNode * pNode = NULL;
......@@ -138,6 +164,13 @@ static void *tsdbLoopCommit(void *arg) {
pRepo = ((SCommitReq *)pNode->data)->pRepo;
// check if need to apply new config
if (pRepo->config_changed) {
pthread_mutex_lock(&pRepo->save_mutex);
tsdbApplyRepoConfig(pRepo);
pthread_mutex_unlock(&pRepo->save_mutex);
}
tsdbCommitData(pRepo);
listNodeFree(pNode);
}
......
......@@ -203,6 +203,70 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
// TODO: think about multithread cases
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
STsdbCfg * pRCfg = &repo->config;
ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
ASSERT(pRCfg->precision == pCfg->precision);
bool configChanged = false;
if (pRCfg->compression != pCfg->compression) {
configChanged = true;
}
if (pRCfg->keep != pCfg->keep) {
configChanged = true;
}
if (pRCfg->keep1 != pCfg->keep1) {
configChanged = true;
}
if (pRCfg->keep2 != pCfg->keep2) {
configChanged = true;
}
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
configChanged = true;
}
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
configChanged = true;
}
if (!configChanged) {
tsdbError("vgId:%d no config changed", REPO_ID(repo));
}
int code = pthread_mutex_lock(&repo->save_mutex);
if (code != 0) {
tsdbError("vgId:%d failed to lock tsdb save config mutex since %s", REPO_ID(repo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
STsdbCfg * pSaveCfg = &repo->save_config;
*pSaveCfg = repo->config;
pSaveCfg->compression = pCfg->compression;
pSaveCfg->keep = pCfg->keep;
pSaveCfg->keep1 = pCfg->keep1;
pSaveCfg->keep2 = pCfg->keep2;
pSaveCfg->cacheLastRow = pCfg->cacheLastRow;
pSaveCfg->totalBlocks = pCfg->totalBlocks;
tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
REPO_ID(repo),
pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2,
pRCfg->cacheLastRow, pRCfg->totalBlocks);
tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
REPO_ID(repo),
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks);
repo->config_changed = true;
pthread_mutex_unlock(&repo->save_mutex);
return 0;
#if 0
STsdbRepo *pRepo = (STsdbRepo *)repo;
......@@ -474,6 +538,14 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return NULL;
}
code = pthread_mutex_init(&(pRepo->save_mutex), NULL);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
tsdbFreeRepo(pRepo);
return NULL;
}
pRepo->config_changed = false;
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) {
code = errno;
......
......@@ -98,17 +98,26 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
STsdbBufPool *pBufPool = pRepo->pPool;
SListNode *pNode = NULL;
bool recycleBlocks = pBufPool->nRecycleBlocks > 0;
if (tsdbLockRepo(pRepo) < 0) return -1;
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
tdListAppendNode(pBufPool->bufBlockList, pNode);
}
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
if (code != 0) {
if (tsdbUnlockRepo(pRepo) < 0) return -1;
tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
if (pBufPool->nRecycleBlocks > 0) {
tsdbRecycleBufferBlock(pBufPool, pNode);
pBufPool->nRecycleBlocks -= 1;
} else {
tdListAppendNode(pBufPool->bufBlockList, pNode);
}
}
if (!recycleBlocks) {
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
if (code != 0) {
if (tsdbUnlockRepo(pRepo) < 0) return -1;
tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
}
if (tsdbUnlockRepo(pRepo) < 0) return -1;
for (int i = 0; i < pMemTable->maxTables; i++) {
......@@ -958,6 +967,15 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
STsdbCfg *pCfg = &pRepo->config;
// if cacheLastRow config has been reset, free the lastRow
if (!pCfg->cacheLastRow && pTable->lastRow != NULL) {
taosTZfree(pTable->lastRow);
TSDB_WLOCK_TABLE(pTable);
pTable->lastRow = NULL;
pTable->lastKey = TSKEY_INITIAL_VAL;
TSDB_WUNLOCK_TABLE(pTable);
}
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
if (pCfg->cacheLastRow || pTable->lastRow != NULL) {
SDataRow nrow = pTable->lastRow;
......
......@@ -170,29 +170,31 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
vDebug("vgId:%d, tsdbchanged:%d syncchanged:%d while alter vnode", pVnode->vgId, tsdbCfgChanged, syncCfgChanged);
if (/*tsdbCfgChanged || */syncCfgChanged) {
if (tsdbCfgChanged || syncCfgChanged) {
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// dbCfgVersion can be corrected by status msg
if (!vnodeSetUpdatingStatus(pVnode)) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
pVnode->dbCfgVersion = dbCfgVersion;
pVnode->vgCfgVersion = vgCfgVersion;
pVnode->syncCfg = syncCfg;
pVnode->tsdbCfg = tsdbCfg;
return TSDB_CODE_SUCCESS;
}
if (syncCfgChanged) {
if (!vnodeSetUpdatingStatus(pVnode)) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
pVnode->dbCfgVersion = dbCfgVersion;
pVnode->vgCfgVersion = vgCfgVersion;
pVnode->syncCfg = syncCfg;
pVnode->tsdbCfg = tsdbCfg;
return TSDB_CODE_SUCCESS;
}
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->dbCfgVersion = dbCfgVersion;
pVnode->vgCfgVersion = vgCfgVersion;
pVnode->syncCfg = syncCfg;
pVnode->tsdbCfg = tsdbCfg;
vnodeSetReadyStatus(pVnode);
return code;
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->dbCfgVersion = dbCfgVersion;
pVnode->vgCfgVersion = vgCfgVersion;
pVnode->syncCfg = syncCfg;
pVnode->tsdbCfg = tsdbCfg;
vnodeSetReadyStatus(pVnode);
return code;
}
}
if (pVnode->tsdb) {
if (tsdbCfgChanged && pVnode->tsdb) {
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
if (code != TSDB_CODE_SUCCESS) {
pVnode->dbCfgVersion = dbCfgVersion;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册