提交 1ab0f30a 编写于 作者: S Shengliang Guan

merge from master

IF (TD_LINUX) IF (TD_LINUX)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})") INSTALL(CODE "execute_process(COMMAND bash ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Linux ${TD_VER_NUMBER})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Linux ${TD_VER_NUMBER})")
ELSEIF (TD_WINDOWS) ELSEIF (TD_WINDOWS)
IF (TD_POWER) IF (TD_POWER)
SET(CMAKE_INSTALL_PREFIX C:/PowerDB) SET(CMAKE_INSTALL_PREFIX C:/PowerDB)
...@@ -41,6 +40,5 @@ ELSEIF (TD_WINDOWS) ...@@ -41,6 +40,5 @@ ELSEIF (TD_WINDOWS)
ELSEIF (TD_DARWIN) ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})") INSTALL(CODE "execute_process(COMMAND bash ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Darwin ${TD_VER_NUMBER})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Darwin ${TD_VER_NUMBER})")
ENDIF () ENDIF ()
...@@ -3567,8 +3567,10 @@ static int postProceSql(char *host, uint16_t port, ...@@ -3567,8 +3567,10 @@ static int postProceSql(char *host, uint16_t port,
break; break;
received += bytes; received += bytes;
response_buf[RESP_BUF_LEN - 1] = '\0'; verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
__func__, __LINE__, received, resp_len, response_buf);
response_buf[RESP_BUF_LEN - 1] = '\0';
if (strlen(response_buf)) { if (strlen(response_buf)) {
verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n", verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
__func__, __LINE__, received, resp_len, response_buf); __func__, __LINE__, received, resp_len, response_buf);
...@@ -6611,7 +6613,6 @@ static int getRowDataFromSample( ...@@ -6611,7 +6613,6 @@ static int getRowDataFromSample(
stbInfo->sampleDataBuf stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * (*sampleUsePos)); + stbInfo->lenOfOneRow * (*sampleUsePos));
} }
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++; (*sampleUsePos)++;
...@@ -11211,7 +11212,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -11211,7 +11212,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
} }
*/ */
if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) { if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) {
#ifdef WINDOWS #ifdef WINDOWS
WSADATA wsaData; WSADATA wsaData;
...@@ -11237,7 +11237,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -11237,7 +11237,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->sockfd = sockfd; pThreadInfo->sockfd = sockfd;
} }
tsem_init(&(pThreadInfo->lock_sem), 0, 0); tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) { if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo); pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
......
...@@ -41,6 +41,7 @@ typedef struct STable { ...@@ -41,6 +41,7 @@ typedef struct STable {
int16_t restoreColumnNum; int16_t restoreColumnNum;
bool hasRestoreLastColumn; bool hasRestoreLastColumn;
int lastColSVersion; int lastColSVersion;
int16_t cacheLastConfigVersion;
T_REF_DECLARE() T_REF_DECLARE()
} STable; } STable;
......
...@@ -79,8 +79,8 @@ struct STsdbRepo { ...@@ -79,8 +79,8 @@ struct STsdbRepo {
STsdbCfg save_config; // save apply config STsdbCfg save_config; // save apply config
bool config_changed; // config changed flag bool config_changed; // config changed flag
pthread_mutex_t save_mutex; // protect save config pthread_mutex_t save_mutex; // protect save config
uint8_t hasCachedLastColumn; int16_t cacheLastConfigVersion;
STsdbAppH appH; STsdbAppH appH;
STsdbStat stat; STsdbStat stat;
...@@ -111,7 +111,8 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); ...@@ -111,7 +111,8 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo);
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable);
void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]);
......
...@@ -146,7 +146,9 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { ...@@ -146,7 +146,9 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) { if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) {
if (tsdbLockRepo(pRepo) < 0) return; if (tsdbLockRepo(pRepo) < 0) return;
tsdbCacheLastData(pRepo, &oldCfg); // tsdbCacheLastData(pRepo, &oldCfg);
// lazy load last cache when query or update
++pRepo->cacheLastConfigVersion;
tsdbUnlockRepo(pRepo); tsdbUnlockRepo(pRepo);
} }
......
...@@ -576,7 +576,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -576,7 +576,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return NULL; return NULL;
} }
pRepo->config_changed = false; pRepo->config_changed = false;
atomic_store_8(&pRepo->hasCachedLastColumn, 0); pRepo->cacheLastConfigVersion = 0;
code = tsem_init(&(pRepo->readyToCommit), 0, 1); code = tsem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) { if (code != 0) {
...@@ -726,7 +726,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -726,7 +726,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
loadStatisData = true; loadStatisData = true;
} }
} }
TSDB_WLOCK_TABLE(pTable); // lock when update pTable->lastCols[]
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
// ignore loaded columns // ignore loaded columns
...@@ -775,6 +775,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -775,6 +775,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
break; break;
} }
} }
TSDB_WUNLOCK_TABLE(pTable);
} }
out: out:
...@@ -803,21 +804,32 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, ...@@ -803,21 +804,32 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
// Get the data in row // Get the data in row
STSchema *pSchema = tsdbGetTableSchema(pTable); STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema)); SMemRow lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) { if (lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
memRowSetType(pTable->lastRow, SMEM_ROW_DATA); memRowSetType(lastRow, SMEM_ROW_DATA);
tdInitDataRow(memRowDataBody(pTable->lastRow), pSchema); tdInitDataRow(memRowDataBody(lastRow), pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) { for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol); STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
tdAppendColVal(memRowDataBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, tdAppendColVal(memRowDataBody(lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
pCol->offset); pCol->offset);
} }
pTable->lastKey = memRowKey(pTable->lastRow); TSKEY lastKey = memRowKey(lastRow);
// during the load data in file, new data would be inserted and last row has been updated
TSDB_WLOCK_TABLE(pTable);
if (pTable->lastRow == NULL) {
pTable->lastKey = lastKey;
pTable->lastRow = lastRow;
TSDB_WUNLOCK_TABLE(pTable);
} else {
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(lastRow);
}
return 0; return 0;
} }
...@@ -889,14 +901,105 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -889,14 +901,105 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
if (CACHE_LAST_NULL_COLUMN(pCfg)) { // if (CACHE_LAST_NULL_COLUMN(pCfg)) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1); // atomic_store_8(&pRepo->hasCachedLastColumn, 1);
// }
return 0;
}
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
int cacheLastRowTableNum = 0;
int cacheLastColTableNum = 0;
bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config));
bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config));
tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, cacheLastCol);
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
if (!cacheLastRow && pTable->lastRow != NULL) {
taosTZfree(pTable->lastRow);
pTable->lastRow = NULL;
} }
if (!cacheLastCol && pTable->lastCols != NULL) {
tsdbFreeLastColumns(pTable);
}
if (!cacheLastRow && !cacheLastCol) {
return 0;
}
cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0;
cacheLastColTableNum = (cacheLastCol && pTable->lastCols == NULL) ? 1 : 0;
if (cacheLastRowTableNum == 0 && cacheLastColTableNum == 0) {
return 0;
}
if (tsdbInitReadH(&readh, pRepo) < 0) {
return -1;
}
tsdbRLockFS(REPO_FS(pRepo));
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
while ((cacheLastRowTableNum > 0 || cacheLastColTableNum > 0) && (pSet = tsdbFSIterNext(&fsiter)) != NULL) {
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
if (tsdbLoadBlockIdx(&readh) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
// tsdbDebug("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
SBlockIdx *pIdx = readh.pBlkIdx;
if (pIdx && (cacheLastRowTableNum > 0) && (pTable->lastRow == NULL)) {
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
cacheLastRowTableNum -= 1;
}
// restore NULL columns
if (pIdx && (cacheLastColTableNum > 0) && !pTable->hasRestoreLastColumn) {
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
if (pTable->hasRestoreLastColumn) {
cacheLastColTableNum -= 1;
}
}
}
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return 0; return 0;
} }
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
bool cacheLastRow = false, cacheLastCol = false; bool cacheLastRow = false, cacheLastCol = false;
SFSIter fsiter; SFSIter fsiter;
SReadH readh; SReadH readh;
...@@ -930,9 +1033,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { ...@@ -930,9 +1033,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
// if close last option,need to free data // if close last option,need to free data
if (need_free_last_row || need_free_last_col) { if (need_free_last_row || need_free_last_col) {
if (need_free_last_col) { // if (need_free_last_col) {
atomic_store_8(&pRepo->hasCachedLastColumn, 0); // atomic_store_8(&pRepo->hasCachedLastColumn, 0);
} // }
tsdbInfo("free cache last data since cacheLast option changed"); tsdbInfo("free cache last data since cacheLast option changed");
for (int i = 1; i <= maxTableIdx; i++) { for (int i = 1; i <= maxTableIdx; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
...@@ -1010,9 +1113,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { ...@@ -1010,9 +1113,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
if (cacheLastCol) { // if (cacheLastCol) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1); // atomic_store_8(&pRepo->hasCachedLastColumn, 1);
} // }
return 0; return 0;
} }
...@@ -1001,7 +1001,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro ...@@ -1001,7 +1001,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
if ((value == NULL) || isNull(value, pTCol->type)) { if ((value == NULL) || isNull(value, pTCol->type)) {
continue; continue;
} }
// lock
TSDB_WLOCK_TABLE(pTable);
SDataCol *pDataCol = &(pLatestCols[idx]); SDataCol *pDataCol = &(pLatestCols[idx]);
if (pDataCol->pData == NULL) { if (pDataCol->pData == NULL) {
pDataCol->pData = malloc(pTCol->bytes); pDataCol->pData = malloc(pTCol->bytes);
...@@ -1017,6 +1018,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro ...@@ -1017,6 +1018,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
memcpy(pDataCol->pData, value, bytes); memcpy(pDataCol->pData, value, bytes);
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
pDataCol->ts = memRowKey(row); pDataCol->ts = memRowKey(row);
// unlock
TSDB_WUNLOCK_TABLE(pTable);
} }
} }
...@@ -1063,5 +1066,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r ...@@ -1063,5 +1066,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
updateTableLatestColumn(pRepo, pTable, row); updateTableLatestColumn(pRepo, pTable, row);
} }
} }
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
return 0; return 0;
} }
...@@ -639,27 +639,30 @@ int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { ...@@ -639,27 +639,30 @@ int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
} }
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
ASSERT(pTable->lastCols == NULL); TSDB_WLOCK_TABLE(pTable);
if (pTable->lastCols == NULL) {
int16_t numOfColumn = pSchema->numOfCols;
int16_t numOfColumn = pSchema->numOfCols; pTable->lastCols = (SDataCol *)malloc(numOfColumn * sizeof(SDataCol));
if (pTable->lastCols == NULL) {
TSDB_WUNLOCK_TABLE(pTable);
return -1;
}
pTable->lastCols = (SDataCol*)malloc(numOfColumn * sizeof(SDataCol)); for (int16_t i = 0; i < numOfColumn; ++i) {
if (pTable->lastCols == NULL) { STColumn *pCol = schemaColAt(pSchema, i);
return -1; SDataCol *pDataCol = &(pTable->lastCols[i]);
} pDataCol->bytes = 0;
pDataCol->pData = NULL;
pDataCol->colId = pCol->colId;
}
for (int16_t i = 0; i < numOfColumn; ++i) { pTable->lastColSVersion = schemaVersion(pSchema);
STColumn *pCol = schemaColAt(pSchema, i); pTable->maxColNum = numOfColumn;
SDataCol* pDataCol = &(pTable->lastCols[i]); pTable->restoreColumnNum = 0;
pDataCol->bytes = 0; pTable->hasRestoreLastColumn = false;
pDataCol->pData = NULL;
pDataCol->colId = pCol->colId;
} }
TSDB_WUNLOCK_TABLE(pTable);
pTable->lastColSVersion = schemaVersion(pSchema);
pTable->maxColNum = numOfColumn;
pTable->restoreColumnNum = 0;
pTable->hasRestoreLastColumn = false;
return 0; return 0;
} }
...@@ -809,6 +812,7 @@ static STable *tsdbNewTable() { ...@@ -809,6 +812,7 @@ static STable *tsdbNewTable() {
pTable->lastCols = NULL; pTable->lastCols = NULL;
pTable->restoreColumnNum = 0; pTable->restoreColumnNum = 0;
pTable->cacheLastConfigVersion = 0;
pTable->maxColNum = 0; pTable->maxColNum = 0;
pTable->hasRestoreLastColumn = false; pTable->hasRestoreLastColumn = false;
pTable->lastColSVersion = -1; pTable->lastColSVersion = -1;
......
...@@ -157,6 +157,7 @@ typedef struct STableGroupSupporter { ...@@ -157,6 +157,7 @@ typedef struct STableGroupSupporter {
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle); static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle);
static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey); static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
...@@ -591,6 +592,28 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon ...@@ -591,6 +592,28 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next);
} }
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
STsdbRepo* pRepo = pQueryHandle->pTsdb;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
int32_t code = 0;
for (size_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
STable* pTable = pCheckInfo->pTableObj;
if (pTable->cacheLastConfigVersion == pRepo->cacheLastConfigVersion) {
continue;
}
code = tsdbLoadLastCache(pRepo, pTable);
if (code != 0) {
tsdbError("%p uid:%" PRId64 ", tid:%d, failed to load last cache since %s", pQueryHandle, pTable->tableId.uid,
pTable->tableId.tid, tstrerror(terrno));
break;
}
}
return code;
}
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
pCond->twindow = updateLastrowForEachGroup(groupList); pCond->twindow = updateLastrowForEachGroup(groupList);
...@@ -604,6 +627,8 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -604,6 +627,8 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
return NULL; return NULL;
} }
lazyLoadCacheLast(pQueryHandle);
int32_t code = checkForCachedLastRow(pQueryHandle, groupList); int32_t code = checkForCachedLastRow(pQueryHandle, groupList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; terrno = code;
...@@ -618,13 +643,14 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -618,13 +643,14 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
return pQueryHandle; return pQueryHandle;
} }
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
if (pQueryHandle == NULL) { if (pQueryHandle == NULL) {
return NULL; return NULL;
} }
lazyLoadCacheLast(pQueryHandle);
int32_t code = checkForCachedLast(pQueryHandle); int32_t code = checkForCachedLast(pQueryHandle);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; terrno = code;
...@@ -2758,6 +2784,9 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { ...@@ -2758,6 +2784,9 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
} }
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
// lock pTable->lastCols[i] as it would be released when schema update(tsdbUpdateLastColSchema)
TSDB_RLOCK_TABLE(pTable);
while(i < tgNumOfCols && j < numOfCols) { while(i < tgNumOfCols && j < numOfCols) {
pColInfo = taosArrayGet(pQueryHandle->pColumns, i); pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pTable->lastCols[j].colId < pColInfo->info.colId) { if (pTable->lastCols[j].colId < pColInfo->info.colId) {
...@@ -2844,6 +2873,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { ...@@ -2844,6 +2873,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
i++; i++;
j++; j++;
} }
TSDB_RUNLOCK_TABLE(pTable);
// leave the real ts column as the last row, because last function only (not stable) use the last row as res // leave the real ts column as the last row, because last function only (not stable) use the last row as res
if (priKey != TSKEY_INITIAL_VAL) { if (priKey != TSKEY_INITIAL_VAL) {
...@@ -3175,7 +3205,9 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle) { ...@@ -3175,7 +3205,9 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle) {
int32_t code = 0; int32_t code = 0;
if (pQueryHandle->pTsdb && atomic_load_8(&pQueryHandle->pTsdb->hasCachedLastColumn)){ STsdbRepo* pRepo = pQueryHandle->pTsdb;
if (pRepo && CACHE_LAST_NULL_COLUMN(&(pRepo->config))) {
pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LAST; pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
} }
......
...@@ -54,29 +54,36 @@ class TDTestCase: ...@@ -54,29 +54,36 @@ class TDTestCase:
binPath = buildPath + "/build/bin/" binPath = buildPath + "/build/bin/"
if(threadID == 0): if(threadID == 0):
print("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT -m t" % print("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT" %
(binPath, self.numberOfTables, self.numberOfRecords)) (binPath, self.numberOfTables, self.numberOfRecords))
os.system("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT" % os.system("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT" %
(binPath, self.numberOfTables, self.numberOfRecords)) (binPath, self.numberOfTables, self.numberOfRecords))
if(threadID == 1): if(threadID == 1):
time.sleep(2) time.sleep(2)
print("use test") print("use test")
while True: max_try = 100
count = 0
while (count < max_try):
try: try:
tdSql.execute("use test") tdSql.execute("use test")
break break
except Exception as e: except Exception as e:
tdLog.info("use database test failed") tdLog.info("use database test failed")
time.sleep(1) time.sleep(2)
count += 1
print("try %d times" % count)
continue continue
# check if all the tables have heen created # check if all the tables have heen created
while True: count = 0
while (count < max_try):
try: try:
tdSql.query("show tables") tdSql.query("show tables")
except Exception as e: except Exception as e:
tdLog.info("show tables test failed") tdLog.info("show tables test failed")
time.sleep(1) time.sleep(2)
count += 1
print("try %d times" % count)
continue continue
rows = tdSql.queryRows rows = tdSql.queryRows
...@@ -85,13 +92,17 @@ class TDTestCase: ...@@ -85,13 +92,17 @@ class TDTestCase:
break break
time.sleep(1) time.sleep(1)
# check if there are any records in the last created table # check if there are any records in the last created table
while True: count = 0
while (count < max_try):
print("query started") print("query started")
print("try %d times" % count)
try: try:
tdSql.query("select * from test.t7") tdSql.query("select * from test.d7")
except Exception as e: except Exception as e:
tdLog.info("select * test failed") tdLog.info("select * test failed")
time.sleep(2) time.sleep(2)
count += 1
print("try %d times" % count)
continue continue
rows = tdSql.queryRows rows = tdSql.queryRows
...@@ -102,8 +113,8 @@ class TDTestCase: ...@@ -102,8 +113,8 @@ class TDTestCase:
print("alter table test.meters add column c10 int") print("alter table test.meters add column c10 int")
tdSql.execute("alter table test.meters add column c10 int") tdSql.execute("alter table test.meters add column c10 int")
print("insert into test.t7 values (now, 1, 2, 3, 4, 0)") print("insert into test.d7 values (now, 1, 2, 3, 4, 0)")
tdSql.execute("insert into test.t7 values (now, 1, 2, 3, 4, 0)") tdSql.execute("insert into test.d7 values (now, 1, 2, 3, 4, 0)")
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册