提交 59c32df4 编写于 作者: H Haojun Liao

[TD-225] merge develop

......@@ -4,6 +4,9 @@
[submodule "src/connector/grafanaplugin"]
path = src/connector/grafanaplugin
url = https://github.com/taosdata/grafanaplugin
[submodule "tests/examples/rust"]
path = tests/examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git
[submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension
url = https://github.com/huskar-t/hivemq-tdengine-extension.git
url = https://github.com/huskar-t/hivemq-tdengine-extension.git
\ No newline at end of file
......@@ -61,7 +61,7 @@ The use of each configuration item is:
* **port**: This is the `http` service port which enables other application to manage rules by `restful API`.
* **database**: rules are stored in a `sqlite` database, this is the path of the database file (if the file does not exist, the alert application creates it automatically).
* **tdengine**: connection string of `TDEngine` server, note the database name should be put in the `sql` field of a rule in most cases, thus it should NOT be included in the string.
* **tdengine**: connection string of `TDEngine` server (please refer the documentation of GO connector for the detailed format of this string), note the database name should be put in the `sql` field of a rule in most cases, thus it should NOT be included in the string.
* **log > level**: log level, could be `production` or `debug`.
* **log > path**: log output file path.
* **receivers > alertManager**: the alert application pushes alerts to `AlertManager` at this URL.
......
......@@ -58,7 +58,7 @@ $ go build
* **port**:报警监测程序支持使用 `restful API` 对规则进行管理,这个参数用于配置 `http` 服务的侦听端口。
* **database**:报警监测程序将规则保存到了一个 `sqlite` 数据库中,这个参数用于指定数据库文件的路径(不需要提前创建这个文件,如果它不存在,程序会自动创建它)。
* **tdengine**`TDEngine` 的连接字符串,一般来说,数据库名应该在报警规则的 `sql` 语句中指定,所以这个字符串中 **不** 应包含数据库名。
* **tdengine**`TDEngine` 的连接字符串(这个字符串的详细格式说明请见 GO 连接器的文档),一般来说,数据库名应该在报警规则的 `sql` 语句中指定,所以这个字符串中 **不** 应包含数据库名。
* **log > level**:日志的记录级别,可选 `production``debug`
* **log > path**:日志文件的路径。
* **receivers > alertManager**:报警监测程序会将报警推送到 `AlertManager`,在这里指定 `AlertManager` 的接收地址。
......
......@@ -84,6 +84,7 @@ func (alert *Alert) doRefresh(firing bool, rule *Rule) bool {
case firing && (alert.State == AlertStateWaiting):
alert.StartsAt = time.Now()
alert.EndsAt = time.Time{}
if rule.For.Nanoseconds() > 0 {
alert.State = AlertStatePending
return false
......@@ -95,6 +96,7 @@ func (alert *Alert) doRefresh(firing bool, rule *Rule) bool {
return false
}
alert.StartsAt = alert.StartsAt.Add(rule.For.Duration)
alert.EndsAt = time.Time{}
alert.State = AlertStateFiring
case firing && (alert.State == AlertStateFiring):
......
......@@ -84,9 +84,9 @@ typedef struct SRetrieveSupport {
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
SColumnModel **pFinalModel, uint32_t nBufferSize);
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel,
int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
......
......@@ -282,6 +282,7 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -2695,17 +2695,18 @@ static void apercentile_func_second_merge(SQLFunctionCtx *pCtx) {
}
SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
SHistogramInfo * pHisto = pOutput->pHisto;
SHistogramInfo *pHisto = pOutput->pHisto;
if (pHisto->numOfElems <= 0) {
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
} else {
//TODO(dengyihao): avoid memcpy
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
tHistogramDestroy(&pOutput->pHisto);
pOutput->pHisto = pRes;
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
tHistogramDestroy(&pRes);
}
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
......
......@@ -618,7 +618,11 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
for (int32_t i = 0; i < numOfRows; ++i) {
uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes);
int32_t bytes = pSchema[i].bytes - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name, tDataTypeDesc[pSchema[i].type].aName, bytes);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[pSchema[i].type].aName);
}
......@@ -641,7 +645,11 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
for (int32_t i = 0; i < numOfRows; ++i) {
uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result),"%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes);
int32_t bytes = pSchema[i].bytes - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result),"%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName, bytes);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName);
}
......@@ -651,7 +659,11 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
for (int32_t i = numOfRows; i < totalRows; i++) {
uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes);
int32_t bytes = pSchema[i].bytes - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName, bytes);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName);
}
......
......@@ -172,14 +172,14 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
SSqlRes* pRes = &pSql->res;
if (pMemBuffer == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("%p pMemBuffer is NULL", pMemBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
}
if (pDesc->pColumnModel == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("%p no local buffer or intermediate result format model", pSql);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
......@@ -197,7 +197,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
}
if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscDebug("%p retrieved no data", pSql);
return;
}
......@@ -206,7 +206,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity,
pMemBuffer[0]->pageSize);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
}
......@@ -217,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pReducer == NULL) {
tscError("%p failed to create local merge structure, out of memory", pSql);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
}
......@@ -334,6 +334,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize;
pReducer->finalModel = pFFModel;
assert(pReducer->finalRowSize > 0);
if (pReducer->finalRowSize > 0) {
pReducer->resColModel->capacity /= pReducer->finalRowSize;
......@@ -531,7 +533,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tfree(pLocalReducer->pFinalRes);
tfree(pLocalReducer->discardData);
tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel,
tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel, pLocalReducer->finalModel,
pLocalReducer->numOfVnode);
for (int32_t i = 0; i < pLocalReducer->numOfBuffer; ++i) {
tfree(pLocalReducer->pLocalDataSrc[i]);
......@@ -655,7 +657,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
}
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
SColumnModel **pFinalModel, uint32_t nBufferSizes) {
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -753,6 +755,18 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
*pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
memset(pSchema, 0, sizeof(SSchema) * size);
size = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < size; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
pSchema[i].bytes = pField->field.bytes;
pSchema[i].type = pField->field.type;
tstrncpy(pSchema[i].name, pField->field.name, tListLen(pSchema[i].name));
}
*pFFModel = createColumnModel(pSchema, (int32_t) size, capacity);
tfree(pSchema);
return TSDB_CODE_SUCCESS;
}
......@@ -763,9 +777,11 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
* @param pFinalModel
* @param numOfVnodes
*/
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel *pFFModel,
int32_t numOfVnodes) {
destroyColumnModel(pFinalModel);
destroyColumnModel(pFFModel);
tOrderDescDestroy(pDesc);
for (int32_t i = 0; i < numOfVnodes; ++i) {
......@@ -873,10 +889,10 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t)pBeforeFillData->num;
tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
tColModelErase(pLocalReducer->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
/* remove the hole in column model */
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
pRes->numOfRows -= pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
......@@ -898,7 +914,7 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
pRes->numOfRows -= overflow;
pBeforeFillData->num -= overflow;
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
// set remain data to be discarded, and reset the interpolation information
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
......@@ -1240,7 +1256,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) {
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize);
pLocalReducer->finalRowSize = doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize);
}
#ifdef _DEBUG_VIEW
......@@ -1610,7 +1626,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->data = pRes->pLocalReducer->pResultBuf->data;
}
void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
char* pbuf = calloc(1, pOutput->num * rowSize);
size_t size = tscNumOfFields(pQueryInfo);
......@@ -1645,8 +1661,10 @@ void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t r
}
assert(finalRowSize <= rowSize);
memcpy(pOutput->data, pbuf, pOutput->num * finalRowSize);
memcpy(pOutput->data, pbuf, pOutput->num * offset);
tfree(pbuf);
tfree(arithSup.data);
return offset;
}
\ No newline at end of file
......@@ -1148,6 +1148,10 @@ int tsParseInsertSql(SSqlObj *pSql) {
index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL);
if (sToken.type != TK_STRING && sToken.type != TK_ID) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error;
}
str += index;
if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
......
......@@ -547,7 +547,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2);
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
......@@ -787,8 +787,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
}
if(tscIsSecondStageQuery(pQueryInfo)) {
size_t output = tscNumOfFields(pQueryInfo);
size_t output = tscNumOfFields(pQueryInfo);
if ((tscIsSecondStageQuery(pQueryInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) ||
UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) && (output != tscSqlExprNumOfExprs(pQueryInfo))) {
pQueryMsg->secondStageOutput = htonl((int32_t) output);
SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
......
......@@ -1644,6 +1644,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel *pModel = NULL;
SColumnModel *pFinalModel = NULL;
pRes->qhandle = 0x1; // hack the qhandle check
......@@ -1662,7 +1663,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
if (ret != 0) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
......@@ -1677,7 +1678,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
if (pSql->pSubs == NULL) {
tfree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
tscQueueAsyncRes(pSql);
return ret;
......@@ -1707,6 +1708,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex = i;
trs->pParentSql = pSql;
trs->pFinalColModel = pModel;
trs->pFFColModel = pFinalModel;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) {
......@@ -1730,13 +1732,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code;
}
......@@ -1876,7 +1878,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tstrerror(pParentSql->res.code));
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel,
pState->numOfSub);
tscFreeRetrieveSup(pSql);
......
......@@ -56,6 +56,23 @@
<scope>test</scope>
</dependency>
<!-- for restful -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.8</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
<build>
<plugins>
......
package com.taosdata.jdbc;
import java.io.*;
import java.sql.Driver;
import java.sql.DriverPropertyInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
public abstract class AbstractTaosDriver implements Driver {
private static final String TAOS_CFG_FILENAME = "taos.cfg";
/**
* @param cfgDirPath
* @return return the config dir
**/
protected File loadConfigDir(String cfgDirPath) {
if (cfgDirPath == null)
return loadDefaultConfigDir();
File cfgDir = new File(cfgDirPath);
if (!cfgDir.exists())
return loadDefaultConfigDir();
return cfgDir;
}
/**
* @return search the default config dir, if the config dir is not exist will return null
*/
protected File loadDefaultConfigDir() {
File cfgDir;
File cfgDir_linux = new File("/etc/taos");
cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
File cfgDir_windows = new File("C:\\TDengine\\cfg");
cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
return cfgDir;
}
protected List<String> loadConfigEndpoints(File cfgFile) {
List<String> endpoints = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
endpoints.add(line.substring(line.indexOf('p') + 1).trim());
}
if (endpoints.size() > 1)
break;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return endpoints;
}
protected void loadTaosConfig(Properties info) {
if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> TAOS_CFG_FILENAME.equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile);
if (!endpoints.isEmpty()) {
info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
}
}
}
protected DriverPropertyInfo[] getPropertyInfo(Properties info) {
DriverPropertyInfo hostProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_HOST, info.getProperty(TSDBDriver.PROPERTY_KEY_HOST));
hostProp.required = false;
hostProp.description = "Hostname";
DriverPropertyInfo portProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_PORT, info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false;
portProp.description = "Port";
DriverPropertyInfo dbProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_DBNAME, info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME));
dbProp.required = false;
dbProp.description = "Database name";
DriverPropertyInfo userProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_USER, info.getProperty(TSDBDriver.PROPERTY_KEY_USER));
userProp.required = true;
userProp.description = "User";
DriverPropertyInfo passwordProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_PASSWORD, info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
passwordProp.required = true;
passwordProp.description = "Password";
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp;
propertyInfo[1] = portProp;
propertyInfo[2] = dbProp;
propertyInfo[3] = userProp;
propertyInfo[4] = passwordProp;
return propertyInfo;
}
protected Properties parseURL(String url, Properties defaults) {
Properties urlProps = (defaults != null) ? defaults : new Properties();
// parse properties
int beginningOfSlashes = url.indexOf("//");
int index = url.indexOf("?");
if (index != -1) {
String paramString = url.substring(index + 1, url.length());
url = url.substring(0, index);
StringTokenizer queryParams = new StringTokenizer(paramString, "&");
while (queryParams.hasMoreElements()) {
String parameterValuePair = queryParams.nextToken();
int indexOfEqual = parameterValuePair.indexOf("=");
String parameter = null;
String value = null;
if (indexOfEqual != -1) {
parameter = parameterValuePair.substring(0, indexOfEqual);
if (indexOfEqual + 1 < parameterValuePair.length()) {
value = parameterValuePair.substring(indexOfEqual + 1);
}
}
if ((value != null && value.length() > 0) && (parameter != null && parameter.length() > 0)) {
urlProps.setProperty(parameter, value);
}
}
}
// parse Product Name
String dbProductName = url.substring(0, beginningOfSlashes);
dbProductName = dbProductName.substring(dbProductName.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
// parse dbname
url = url.substring(beginningOfSlashes + 2);
int indexOfSlash = url.indexOf("/");
if (indexOfSlash != -1) {
if (indexOfSlash + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_DBNAME, url.substring(indexOfSlash + 1));
}
url = url.substring(0, indexOfSlash);
}
// parse port
int indexOfColon = url.indexOf(":");
if (indexOfColon != -1) {
if (indexOfColon + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_PORT, url.substring(indexOfColon + 1));
}
url = url.substring(0, indexOfColon);
}
// parse host
if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
}
return urlProps;
}
}
......@@ -197,7 +197,8 @@ public class TSDBConnection implements Connection {
public SQLWarning getWarnings() throws SQLException {
//todo: implement getWarnings according to the warning messages returned from TDengine
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return null;
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
public void clearWarnings() throws SQLException {
......
......@@ -14,7 +14,6 @@
*****************************************************************************/
package com.taosdata.jdbc;
import java.io.*;
import java.sql.*;
import java.util.*;
import java.util.logging.Logger;
......@@ -38,7 +37,7 @@ import java.util.logging.Logger;
* register it with the DriverManager. This means that a user can load and
* register a driver by doing Class.forName("foo.bah.Driver")
*/
public class TSDBDriver implements java.sql.Driver {
public class TSDBDriver extends AbstractTaosDriver {
@Deprecated
private static final String URL_PREFIX1 = "jdbc:TSDB://";
......@@ -97,50 +96,6 @@ public class TSDBDriver implements java.sql.Driver {
}
}
private List<String> loadConfigEndpoints(File cfgFile) {
List<String> endpoints = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
endpoints.add(line.substring(line.indexOf('p') + 1).trim());
}
if (endpoints.size() > 1)
break;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return endpoints;
}
/**
* @param cfgDirPath
* @return return the config dir
**/
private File loadConfigDir(String cfgDirPath) {
if (cfgDirPath == null)
return loadDefaultConfigDir();
File cfgDir = new File(cfgDirPath);
if (!cfgDir.exists())
return loadDefaultConfigDir();
return cfgDir;
}
/**
* @return search the default config dir, if the config dir is not exist will return null
*/
private File loadDefaultConfigDir() {
File cfgDir;
File cfgDir_linux = new File("/etc/taos");
cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
File cfgDir_windows = new File("C:\\TDengine\\cfg");
cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
return cfgDir;
}
public Connection connect(String url, Properties info) throws SQLException {
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
......@@ -152,26 +107,12 @@ public class TSDBDriver implements java.sql.Driver {
if ((props = parseURL(url, info)) == null) {
return null;
}
//load taos.cfg start
if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile);
if (!endpoints.isEmpty()) {
info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
}
}
loadTaosConfig(info);
try {
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR),
(String) props.get(PROPERTY_KEY_LOCALE),
(String) props.get(PROPERTY_KEY_CHARSET),
(String) props.get(PROPERTY_KEY_TIME_ZONE));
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE),
(String) props.get(PROPERTY_KEY_CHARSET), (String) props.get(PROPERTY_KEY_TIME_ZONE));
Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn;
} catch (SQLWarning sqlWarning) {
......@@ -208,39 +149,13 @@ public class TSDBDriver implements java.sql.Driver {
info = parseURL(url, info);
}
DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST));
hostProp.required = false;
hostProp.description = "Hostname";
DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false;
portProp.description = "Port";
DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME));
dbProp.required = false;
dbProp.description = "Database name";
DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER));
userProp.required = true;
userProp.description = "User";
DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, info.getProperty(PROPERTY_KEY_PASSWORD));
passwordProp.required = true;
passwordProp.description = "Password";
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp;
propertyInfo[1] = portProp;
propertyInfo[2] = dbProp;
propertyInfo[3] = userProp;
propertyInfo[4] = passwordProp;
return propertyInfo;
return getPropertyInfo(info);
}
/**
* example: jdbc:TAOS://127.0.0.1:0/db?user=root&password=your_password
*/
@Override
public Properties parseURL(String url, Properties defaults) {
Properties urlProps = (defaults != null) ? defaults : new Properties();
if (url == null || url.length() <= 0 || url.trim().length() <= 0)
......@@ -296,86 +211,10 @@ public class TSDBDriver implements java.sql.Driver {
if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
}
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty(TSDBDriver.PROPERTY_KEY_USER));
/*
String urlForMeta = url;
String dbProductName = url.substring(url.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
int beginningOfSlashes = url.indexOf("//");
url = url.substring(beginningOfSlashes + 2);
String host = url.substring(0, url.indexOf(":"));
url = url.substring(url.indexOf(":") + 1);
urlProps.setProperty(PROPERTY_KEY_HOST, host);
String port = url.substring(0, url.indexOf("/"));
urlProps.setProperty(PROPERTY_KEY_PORT, port);
url = url.substring(url.indexOf("/") + 1);
if (url.indexOf("?") != -1) {
String dbName = url.substring(0, url.indexOf("?"));
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
url = url.trim().substring(url.indexOf("?") + 1);
} else {
// without user & password so return
if (!url.trim().isEmpty()) {
String dbName = url.trim();
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
}
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty("user"));
return urlProps;
}
String user = "";
if (url.indexOf("&") == -1) {
String[] kvPair = url.trim().split("=");
if (kvPair.length == 2) {
setPropertyValue(urlProps, kvPair);
return urlProps;
}
}
String[] queryStrings = url.trim().split("&");
for (String queryStr : queryStrings) {
String[] kvPair = queryStr.trim().split("=");
if (kvPair.length < 2) {
continue;
}
setPropertyValue(urlProps, kvPair);
}
user = urlProps.getProperty(PROPERTY_KEY_USER).toString();
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user);
*/
return urlProps;
}
private void setPropertyValue(Properties property, String[] keyValuePair) {
switch (keyValuePair[0].toLowerCase()) {
case PROPERTY_KEY_USER:
property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]);
break;
case PROPERTY_KEY_PASSWORD:
property.setProperty(PROPERTY_KEY_PASSWORD, keyValuePair[1]);
break;
case PROPERTY_KEY_TIME_ZONE:
property.setProperty(PROPERTY_KEY_TIME_ZONE, keyValuePair[1]);
break;
case PROPERTY_KEY_LOCALE:
property.setProperty(PROPERTY_KEY_LOCALE, keyValuePair[1]);
break;
case PROPERTY_KEY_CHARSET:
property.setProperty(PROPERTY_KEY_CHARSET, keyValuePair[1]);
break;
case PROPERTY_KEY_CONFIG_DIR:
property.setProperty(PROPERTY_KEY_CONFIG_DIR, keyValuePair[1]);
break;
}
}
public int getMajorVersion() {
return 2;
}
......
package com.taosdata.jdbc.rs;
import com.taosdata.jdbc.TSDBConstants;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
public class RestfulConnection implements Connection {
private final String host;
private final int port;
private final Properties props;
private final String database;
private final String url;
public RestfulConnection(String host, String port, Properties props, String database, String url) {
this.host = host;
this.port = Integer.parseInt(port);
this.props = props;
this.database = database;
this.url = url;
}
@Override
public Statement createStatement() throws SQLException {
if (isClosed())
throw new SQLException(TSDBConstants.WrapErrMsg("restful TDengine connection is closed."));
return new RestfulStatement(this, this.database);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
return null;
}
@Override
public String nativeSQL(String sql) throws SQLException {
return null;
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
}
@Override
public boolean getAutoCommit() throws SQLException {
return false;
}
@Override
public void commit() throws SQLException {
}
@Override
public void rollback() throws SQLException {
}
@Override
public void close() throws SQLException {
}
@Override
public boolean isClosed() throws SQLException {
return false;
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
//TODO: RestfulDatabaseMetaData is not implemented
return new RestfulDatabaseMetaData();
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
}
@Override
public boolean isReadOnly() throws SQLException {
return false;
}
@Override
public void setCatalog(String catalog) throws SQLException {
}
@Override
public String getCatalog() throws SQLException {
return null;
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
}
@Override
public int getTransactionIsolation() throws SQLException {
return 0;
}
@Override
public SQLWarning getWarnings() throws SQLException {
return null;
}
@Override
public void clearWarnings() throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
return null;
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
}
@Override
public void setHoldability(int holdability) throws SQLException {
}
@Override
public int getHoldability() throws SQLException {
return 0;
}
@Override
public Savepoint setSavepoint() throws SQLException {
return null;
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
return null;
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return null;
}
@Override
public Clob createClob() throws SQLException {
return null;
}
@Override
public Blob createBlob() throws SQLException {
return null;
}
@Override
public NClob createNClob() throws SQLException {
return null;
}
@Override
public SQLXML createSQLXML() throws SQLException {
return null;
}
@Override
public boolean isValid(int timeout) throws SQLException {
return false;
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
}
@Override
public String getClientInfo(String name) throws SQLException {
return null;
}
@Override
public Properties getClientInfo() throws SQLException {
return null;
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
return null;
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
return null;
}
@Override
public void setSchema(String schema) throws SQLException {
}
@Override
public String getSchema() throws SQLException {
return null;
}
@Override
public void abort(Executor executor) throws SQLException {
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
}
@Override
public int getNetworkTimeout() throws SQLException {
return 0;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public Properties getProps() {
return props;
}
public String getDatabase() {
return database;
}
public String getUrl() {
return url;
}
}
package com.taosdata.jdbc.rs;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.AbstractTaosDriver;
import com.taosdata.jdbc.TSDBConstants;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.rs.util.HttpClientPoolUtil;
import java.sql.*;
import java.util.Properties;
import java.util.logging.Logger;
public class RestfulDriver extends AbstractTaosDriver {
private static final String URL_PREFIX = "jdbc:TAOS-RS://";
static {
try {
DriverManager.registerDriver(new RestfulDriver());
} catch (SQLException e) {
throw new RuntimeException(TSDBConstants.WrapErrMsg("can not register Restful JDBC driver"), e);
}
}
@Override
public Connection connect(String url, Properties info) throws SQLException {
// throw SQLException if url is null
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
// return null if url is not be accepted
if (!acceptsURL(url))
return null;
Properties props = parseURL(url, info);
String host = props.getProperty(TSDBDriver.PROPERTY_KEY_HOST, "localhost");
String port = props.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "6041");
String database = props.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME);
String loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":"
+ props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/"
+ props.getProperty(TSDBDriver.PROPERTY_KEY_USER) + "/"
+ props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD) + "";
String result = HttpClientPoolUtil.execute(loginUrl);
JSONObject jsonResult = JSON.parseObject(result);
String status = jsonResult.getString("status");
if (!status.equals("succ")) {
throw new SQLException(jsonResult.getString("desc"));
}
return new RestfulConnection(host, port, props, database, url);
}
@Override
public boolean acceptsURL(String url) throws SQLException {
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is null"));
return (url != null && url.length() > 0 && url.trim().length() > 0) && url.startsWith(URL_PREFIX);
}
@Override
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
if (info == null) {
info = new Properties();
}
if (acceptsURL(url)) {
info = parseURL(url, info);
}
return getPropertyInfo(info);
}
@Override
public int getMajorVersion() {
return 2;
}
@Override
public int getMinorVersion() {
return 0;
}
@Override
public boolean jdbcCompliant() {
return false;
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
}
package com.taosdata.jdbc.rs;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
public class RestfulResultSetMetaData implements ResultSetMetaData {
private List<String> fields;
public RestfulResultSetMetaData(List<String> fields) {
this.fields = fields;
}
@Override
public int getColumnCount() throws SQLException {
return fields.size();
}
@Override
public boolean isAutoIncrement(int column) throws SQLException {
return false;
}
@Override
public boolean isCaseSensitive(int column) throws SQLException {
return false;
}
@Override
public boolean isSearchable(int column) throws SQLException {
return false;
}
@Override
public boolean isCurrency(int column) throws SQLException {
return false;
}
@Override
public int isNullable(int column) throws SQLException {
return 0;
}
@Override
public boolean isSigned(int column) throws SQLException {
return false;
}
@Override
public int getColumnDisplaySize(int column) throws SQLException {
return 0;
}
@Override
public String getColumnLabel(int column) throws SQLException {
return fields.get(column - 1);
}
@Override
public String getColumnName(int column) throws SQLException {
return null;
}
@Override
public String getSchemaName(int column) throws SQLException {
return null;
}
@Override
public int getPrecision(int column) throws SQLException {
return 0;
}
@Override
public int getScale(int column) throws SQLException {
return 0;
}
@Override
public String getTableName(int column) throws SQLException {
return null;
}
@Override
public String getCatalogName(int column) throws SQLException {
return null;
}
@Override
public int getColumnType(int column) throws SQLException {
return 0;
}
@Override
public String getColumnTypeName(int column) throws SQLException {
return null;
}
@Override
public boolean isReadOnly(int column) throws SQLException {
return false;
}
@Override
public boolean isWritable(int column) throws SQLException {
return false;
}
@Override
public boolean isDefinitelyWritable(int column) throws SQLException {
return false;
}
@Override
public String getColumnClassName(int column) throws SQLException {
return null;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
}
package com.taosdata.jdbc.rs;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBConstants;
import com.taosdata.jdbc.rs.util.HttpClientPoolUtil;
import java.sql.*;
import java.util.Arrays;
import java.util.List;
public class RestfulStatement implements Statement {
private final String catalog;
private final RestfulConnection conn;
public RestfulStatement(RestfulConnection c, String catalog) {
this.conn = c;
this.catalog = catalog;
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
final String url = "http://" + conn.getHost() + ":"+conn.getPort()+"/rest/sql";
String result = HttpClientPoolUtil.execute(url, sql);
String fields = "";
List<String> words = Arrays.asList(sql.split(" "));
if (words.get(0).equalsIgnoreCase("select")) {
int index = 0;
if (words.contains("from")) {
index = words.indexOf("from");
}
if (words.contains("FROM")) {
index = words.indexOf("FROM");
}
fields = HttpClientPoolUtil.execute(url, "DESCRIBE " + words.get(index + 1));
}
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject.getString("status").equals("error")) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " +
jsonObject.getString("desc") + "\n" +
"error code: " + jsonObject.getString("code")));
}
String dataStr = jsonObject.getString("data");
if ("use".equalsIgnoreCase(fields.split(" ")[0])) {
return new RestfulResultSet(dataStr, "");
}
JSONObject jsonField = JSON.parseObject(fields);
if (jsonField == null) {
return new RestfulResultSet(dataStr, "");
}
if (jsonField.getString("status").equals("error")) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " +
jsonField.getString("desc") + "\n" +
"error code: " + jsonField.getString("code")));
}
String fieldData = jsonField.getString("data");
return new RestfulResultSet(dataStr, fieldData);
}
@Override
public int executeUpdate(String sql) throws SQLException {
return 0;
}
@Override
public void close() throws SQLException {
}
@Override
public int getMaxFieldSize() throws SQLException {
return 0;
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
}
@Override
public int getMaxRows() throws SQLException {
return 0;
}
@Override
public void setMaxRows(int max) throws SQLException {
}
@Override
public void setEscapeProcessing(boolean enable) throws SQLException {
}
@Override
public int getQueryTimeout() throws SQLException {
return 0;
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
}
@Override
public void cancel() throws SQLException {
}
@Override
public SQLWarning getWarnings() throws SQLException {
return null;
}
@Override
public void clearWarnings() throws SQLException {
}
@Override
public void setCursorName(String name) throws SQLException {
}
@Override
public boolean execute(String sql) throws SQLException {
return false;
}
@Override
public ResultSet getResultSet() throws SQLException {
return null;
}
@Override
public int getUpdateCount() throws SQLException {
return 0;
}
@Override
public boolean getMoreResults() throws SQLException {
return false;
}
@Override
public void setFetchDirection(int direction) throws SQLException {
}
@Override
public int getFetchDirection() throws SQLException {
return 0;
}
@Override
public void setFetchSize(int rows) throws SQLException {
}
@Override
public int getFetchSize() throws SQLException {
return 0;
}
@Override
public int getResultSetConcurrency() throws SQLException {
return 0;
}
@Override
public int getResultSetType() throws SQLException {
return 0;
}
@Override
public void addBatch(String sql) throws SQLException {
}
@Override
public void clearBatch() throws SQLException {
}
@Override
public int[] executeBatch() throws SQLException {
return new int[0];
}
@Override
public Connection getConnection() throws SQLException {
return null;
}
@Override
public boolean getMoreResults(int current) throws SQLException {
return false;
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
return null;
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
return 0;
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
return 0;
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
return 0;
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
return false;
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
return false;
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
return false;
}
@Override
public int getResultSetHoldability() throws SQLException {
return 0;
}
@Override
public boolean isClosed() throws SQLException {
return false;
}
@Override
public void setPoolable(boolean poolable) throws SQLException {
}
@Override
public boolean isPoolable() throws SQLException {
return false;
}
@Override
public void closeOnCompletion() throws SQLException {
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
return false;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
}
package com.taosdata.jdbc.rs.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
public class HttpClientPoolUtil {
public static PoolingHttpClientConnectionManager cm = null;
public static CloseableHttpClient httpClient = null;
/**
* 默认content 类型
*/
private static final String DEFAULT_CONTENT_TYPE = "application/json";
/**
* 默认请求超时时间30s
*/
private static final int DEFAULT_TIME_OUT = 15000;
private static final int count = 32;
private static final int totalCount = 1000;
private static final int Http_Default_Keep_Time = 15000;
/**
* 初始化连接池
*/
public static synchronized void initPools() {
if (httpClient == null) {
cm = new PoolingHttpClientConnectionManager();
cm.setDefaultMaxPerRoute(count);
cm.setMaxTotal(totalCount);
httpClient = HttpClients.custom().setKeepAliveStrategy(defaultStrategy).setConnectionManager(cm).build();
}
}
/**
* Http connection keepAlive 设置
*/
public static ConnectionKeepAliveStrategy defaultStrategy = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
int keepTime = Http_Default_Keep_Time * 1000;
while (it.hasNext()) {
HeaderElement headerElement = it.nextElement();
String param = headerElement.getName();
String value = headerElement.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
try {
return Long.parseLong(value) * 1000;
} catch (Exception e) {
new Exception(
"format KeepAlive timeout exception, exception:" + e.toString())
.printStackTrace();
}
}
}
return keepTime;
};
public static CloseableHttpClient getHttpClient() {
return httpClient;
}
public static PoolingHttpClientConnectionManager getHttpConnectionManager() {
return cm;
}
/**
* 执行http post请求
* 默认采用Content-Type:application/json,Accept:application/json
*
* @param uri 请求地址
* @param data 请求数据
* @return responseBody
*/
public static String execute(String uri, String data) {
long startTime = System.currentTimeMillis();
HttpEntity httpEntity = null;
HttpEntityEnclosingRequestBase method = null;
String responseBody = "";
try {
if (httpClient == null) {
initPools();
}
method = (HttpEntityEnclosingRequestBase) getRequest(uri, HttpPost.METHOD_NAME, DEFAULT_CONTENT_TYPE, 0);
method.setEntity(new StringEntity(data));
HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity();
if (httpEntity != null) {
responseBody = EntityUtils.toString(httpEntity, "UTF-8");
}
} catch (Exception e) {
if (method != null) {
method.abort();
}
// e.printStackTrace();
// logger.error("execute post request exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception("execute post request exception, url:"
+ uri + ", exception:" + e.toString() +
", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
} finally {
if (httpEntity != null) {
try {
EntityUtils.consumeQuietly(httpEntity);
} catch (Exception e) {
// e.printStackTrace();
// logger.error("close response exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception(
"close response exception, url:" + uri +
", exception:" + e.toString()
+ ", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
}
}
}
return responseBody;
}
/**
* * 创建请求
*
* @param uri 请求url
* @param methodName 请求的方法类型
* @param contentType contentType类型
* @param timeout 超时时间
* @return HttpRequestBase 返回类型
* @author lisc
*/
public static HttpRequestBase getRequest(String uri, String methodName, String contentType, int timeout) {
if (httpClient == null) {
initPools();
}
HttpRequestBase method;
if (timeout <= 0) {
timeout = DEFAULT_TIME_OUT;
}
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(timeout * 1000)
.setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000)
.setExpectContinueEnabled(false).build();
if (HttpPut.METHOD_NAME.equalsIgnoreCase(methodName)) {
method = new HttpPut(uri);
} else if (HttpPost.METHOD_NAME.equalsIgnoreCase(methodName)) {
method = new HttpPost(uri);
} else if (HttpGet.METHOD_NAME.equalsIgnoreCase(methodName)) {
method = new HttpGet(uri);
} else {
method = new HttpPost(uri);
}
if (StringUtils.isBlank(contentType)) {
contentType = DEFAULT_CONTENT_TYPE;
}
method.addHeader("Content-Type", contentType);
method.addHeader("Accept", contentType);
method.setConfig(requestConfig);
return method;
}
/**
* 执行GET 请求
*
* @param uri 网址
* @return responseBody
*/
public static String execute(String uri) {
long startTime = System.currentTimeMillis();
HttpEntity httpEntity = null;
HttpRequestBase method = null;
String responseBody = "";
try {
if (httpClient == null) {
initPools();
}
method = getRequest(uri, HttpGet.METHOD_NAME, DEFAULT_CONTENT_TYPE, 0);
HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity();
if (httpEntity != null) {
responseBody = EntityUtils.toString(httpEntity, "UTF-8");
// logger.info("请求URL: " + uri + "+ 返回状态码:" + httpResponse.getStatusLine().getStatusCode());
}
} catch (Exception e) {
if (method != null) {
method.abort();
}
e.printStackTrace();
// logger.error("execute get request exception, url:" + uri + ", exception:" + e.toString() + ",cost time(ms):"
// + (System.currentTimeMillis() - startTime));
System.out.println("log:调用 HttpClientPoolUtil execute get request exception, url:" + uri + ", exception:" + e.toString() + ",cost time(ms):"
+ (System.currentTimeMillis() - startTime));
} finally {
if (httpEntity != null) {
try {
EntityUtils.consumeQuietly(httpEntity);
} catch (Exception e) {
// e.printStackTrace();
// logger.error("close response exception, url:" + uri + ", exception:" + e.toString()
// + ",cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception("close response exception, url:" + uri + ", exception:" + e.toString()
+ ",cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
}
}
}
return responseBody;
}
}
\ No newline at end of file
package com.taosdata.jdbc.rs;
import org.junit.Assert;
import org.junit.Test;
import java.sql.*;
public class RestfulDriverTest {
@Test
public void testCase001() {
try {
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
Connection connection = DriverManager.getConnection("jdbc:TAOS-RS://master:6041/?user=root&password=taosdata");
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from log.log");
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String column = metaData.getColumnLabel(i);
String value = resultSet.getString(i);
System.out.print(column + ":" + value + "\t");
}
System.out.println();
}
statement.close();
connection.close();
} catch (SQLException | ClassNotFoundException e) {
e.printStackTrace();
}
}
@Test
public void testAcceptUrl() throws SQLException {
Driver driver = new RestfulDriver();
boolean isAccept = driver.acceptsURL("jdbc:TAOS-RS://master:6041");
Assert.assertTrue(isAccept);
}
}
......@@ -24,7 +24,7 @@ int32_t dnodeInitVRead();
void dnodeCleanupVRead();
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg);
void * dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
void dnodeFreeVReadQueue(void *pRqueue);
#ifdef __cplusplus
}
......
......@@ -24,8 +24,8 @@ int32_t dnodeInitVWrite();
void dnodeCleanupVWrite();
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
void * dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
#ifdef __cplusplus
}
......
......@@ -151,6 +151,13 @@ void dnodeCleanupClient() {
}
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dDebug("msg:%p is ignored since dnode not running", pMsg);
rpcFreeCont(pMsg->pCont);
return;
}
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateEpSetForPeer(pEpSet);
}
......
......@@ -35,7 +35,7 @@ typedef struct {
pthread_mutex_t mutex;
} SVReadWorkerPool;
static void *dnodeProcessReadQueue(void *param);
static void *dnodeProcessReadQueue(void *pWorker);
// module global variable
static SVReadWorkerPool tsVReadWP;
......@@ -47,7 +47,7 @@ int32_t dnodeInitVRead() {
tsVReadWP.min = tsNumOfCores;
tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min;
tsVReadWP.worker = (SVReadWorker *)calloc(sizeof(SVReadWorker), tsVReadWP.max);
tsVReadWP.worker = calloc(sizeof(SVReadWorker), tsVReadWP.max);
pthread_mutex_init(&tsVReadWP.mutex, NULL);
if (tsVReadWP.worker == NULL) return -1;
......@@ -85,7 +85,7 @@ void dnodeCleanupVRead() {
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char * pCont = (char *)pMsg->pCont;
char * pCont = pMsg->pCont;
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *)pCont;
......@@ -146,8 +146,8 @@ void *dnodeAllocVReadQueue(void *pVnode) {
return queue;
}
void dnodeFreeVReadQueue(void *rqueue) {
taosCloseQueue(rqueue);
void dnodeFreeVReadQueue(void *pRqueue) {
taosCloseQueue(pRqueue);
}
void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
......@@ -159,14 +159,12 @@ void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
};
rpcSendResponse(&rpcRsp);
vnodeRelease(pVnode);
}
void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
vnodeRelease(pVnode);
}
static void *dnodeProcessReadQueue(void *param) {
static void *dnodeProcessReadQueue(void *pWorker) {
SVReadMsg *pRead;
int32_t qtype;
void * pVnode;
......@@ -193,7 +191,7 @@ static void *dnodeProcessReadQueue(void *param) {
}
}
taosFreeQitem(pRead);
vnodeFreeFromRQueue(pVnode, pRead);
}
return NULL;
......
......@@ -38,11 +38,11 @@ typedef struct {
} SVWriteWorkerPool;
static SVWriteWorkerPool tsVWriteWP;
static void *dnodeProcessVWriteQueue(void *param);
static void *dnodeProcessVWriteQueue(void *pWorker);
int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
tsVWriteWP.worker = tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL);
......@@ -162,13 +162,13 @@ void *dnodeAllocVWriteQueue(void *pVnode) {
return queue;
}
void dnodeFreeVWriteQueue(void *wqueue) {
taosCloseQueue(wqueue);
void dnodeFreeVWriteQueue(void *pWqueue) {
taosCloseQueue(pWqueue);
}
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (param == NULL) return;
SVWriteMsg *pWrite = param;
void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (wparam == NULL) return;
SVWriteMsg *pWrite = wparam;
if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
......@@ -183,13 +183,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
};
rpcSendResponse(&rpcRsp);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
vnodeFreeFromWQueue(pVnode, pWrite);
}
static void *dnodeProcessVWriteQueue(void *param) {
SVWriteWorker *pWorker = param;
static void *dnodeProcessVWriteQueue(void *wparam) {
SVWriteWorker *pWorker = wparam;
SVWriteMsg * pWrite;
void * pVnode;
int32_t numOfMsgs;
......@@ -232,8 +230,7 @@ static void *dnodeProcessVWriteQueue(void *param) {
if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp);
}
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
vnodeFreeFromWQueue(pVnode, pWrite);
}
}
}
......
......@@ -54,10 +54,10 @@ void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
void dnodeFreeVReadQueue(void *pRqueue);
int32_t dnodeAllocateMPeerQueue();
void dnodeFreeMPeerQueue();
......
......@@ -201,6 +201,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing da
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
......
......@@ -46,7 +46,7 @@ extern "C" {
typedef struct {
void *appH;
void *cqH;
int (*notifyStatus)(void *, int status);
int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle);
......@@ -83,7 +83,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg);
int32_t tsdbDropRepo(char *rootDir);
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg);
int tsdbGetState(TSDB_REPO_T *repo);
......
......@@ -23,12 +23,12 @@ extern "C" {
#include "twal.h"
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_READY,
TAOS_VN_STATUS_CLOSING,
TAOS_VN_STATUS_UPDATING,
TAOS_VN_STATUS_RESET,
} EVnStatus;
TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING = 3,
TAOS_VN_STATUS_RESET = 4,
} EVnodeStatus;
typedef struct {
int32_t len;
......@@ -70,17 +70,19 @@ void* vnodeAcquire(int32_t vgId); // add refcount
void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
void vnodeBuildStatusMsg(void *pStatus);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources();
void vnodeCleanupResources();
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
#ifdef __cplusplus
......
......@@ -40,19 +40,22 @@ typedef struct {
enum _show_db_index {
TSDB_SHOW_DB_NAME_INDEX,
TSDB_SHOW_DB_CREATED_TIME_INDEX,
TSDB_SHOW_DB_VGROUPS_INDEX,
TSDB_SHOW_DB_NTABLES_INDEX,
TSDB_SHOW_DB_VGROUPS_INDEX,
TSDB_SHOW_DB_REPLICA_INDEX,
TSDB_SHOW_DB_QUORUM_INDEX,
TSDB_SHOW_DB_DAYS_INDEX,
TSDB_SHOW_DB_KEEP_INDEX,
TSDB_SHOW_DB_TABLES_INDEX,
TSDB_SHOW_DB_ROWS_INDEX,
TSDB_SHOW_DB_CACHE_INDEX,
TSDB_SHOW_DB_ABLOCKS_INDEX,
TSDB_SHOW_DB_TBLOCKS_INDEX,
TSDB_SHOW_DB_CTIME_INDEX,
TSDB_SHOW_DB_CLOG_INDEX,
TSDB_SHOW_DB_BLOCKS_INDEX,
TSDB_SHOW_DB_MINROWS_INDEX,
TSDB_SHOW_DB_MAXROWS_INDEX,
TSDB_SHOW_DB_WALLEVEL_INDEX,
TSDB_SHOW_DB_FSYNC_INDEX,
TSDB_SHOW_DB_COMP_INDEX,
TSDB_SHOW_DB_PRECISION_INDEX,
TSDB_SHOW_DB_UPDATE_INDEX,
TSDB_SHOW_DB_STATUS_INDEX,
TSDB_MAX_SHOW_DB
};
......@@ -90,17 +93,23 @@ extern char version[];
typedef struct {
char name[TSDB_DB_NAME_LEN + 1];
int32_t replica;
int32_t days;
int32_t keep;
int32_t tables;
int32_t rows;
int32_t cache;
int32_t ablocks;
int32_t tblocks;
int32_t ctime;
int32_t clog;
int32_t comp;
int32_t tables;
int32_t vgroups;
int16_t replications;
int16_t quorum;
int16_t daysPerFile;
int16_t daysToKeep;
int16_t daysToKeep1;
int16_t daysToKeep2;
int32_t cacheBlockSize; //MB
int32_t totalBlocks;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int8_t walLevel;
int32_t fsyncPeriod;
int8_t compression;
int8_t precision; // time resolution
int8_t update;
} SDbInfo;
typedef struct {
......@@ -173,6 +182,7 @@ static struct argp_option options[] = {
{"start-time", 'S', "START_TIME", 0, "Start time to dump.", 3},
{"end-time", 'E', "END_TIME", 0, "End time to dump.", 3},
{"data-batch", 'N', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3},
{"max-sql-len", 'L', "SQL_LEN", 0, "Max length of one sql. Default is 65480.", 3},
{"table-batch", 't', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3},
{"thread_num", 'T', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3},
{"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3},
......@@ -200,6 +210,7 @@ struct arguments {
int64_t start_time;
int64_t end_time;
int32_t data_batch;
int32_t max_sql_len;
int32_t table_batch; // num of table which will be dump into one output file.
bool allow_sys;
// other options
......@@ -298,6 +309,17 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'N':
arguments->data_batch = atoi(arg);
break;
case 'L':
{
int32_t len = atoi(arg);
if (len > TSDB_MAX_ALLOWED_SQL_LEN) {
len = TSDB_MAX_ALLOWED_SQL_LEN;
} else if (len < TSDB_MAX_SQL_LEN) {
len = TSDB_MAX_SQL_LEN;
}
arguments->max_sql_len = len;
break;
}
case 't':
arguments->table_batch = atoi(arg);
break;
......@@ -360,6 +382,7 @@ struct arguments tsArguments = {
0,
INT64_MAX,
1,
TSDB_MAX_SQL_LEN,
1,
false,
// other options
......@@ -415,7 +438,9 @@ int main(int argc, char *argv[]) {
printf("start_time: %" PRId64 "\n", tsArguments.start_time);
printf("end_time: %" PRId64 "\n", tsArguments.end_time);
printf("data_batch: %d\n", tsArguments.data_batch);
printf("max_sql_len: %d\n", tsArguments.max_sql_len);
printf("table_batch: %d\n", tsArguments.table_batch);
printf("thread_num: %d\n", tsArguments.thread_num);
printf("allow_sys: %d\n", tsArguments.allow_sys);
printf("abort: %d\n", tsArguments.abort);
printf("isDumpIn: %d\n", tsArguments.isDumpIn);
......@@ -682,8 +707,8 @@ int taosDumpOut(struct arguments *arguments) {
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result)) != NULL) {
// sys database name : 'monitor', but subsequent version changed to 'log'
if (strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "monitor", fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0 &&
// sys database name : 'log', but subsequent version changed to 'log'
if (strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log", fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0 &&
(!arguments->allow_sys))
continue;
......@@ -711,20 +736,27 @@ int taosDumpOut(struct arguments *arguments) {
}
strncpy(dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX], fields[TSDB_SHOW_DB_NAME_INDEX].bytes);
#if 0
dbInfos[count]->replica = (int)(*((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]));
dbInfos[count]->days = (int)(*((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]));
dbInfos[count]->keep = *((int *)row[TSDB_SHOW_DB_KEEP_INDEX]);
dbInfos[count]->tables = *((int *)row[TSDB_SHOW_DB_TABLES_INDEX]);
dbInfos[count]->rows = *((int *)row[TSDB_SHOW_DB_ROWS_INDEX]);
dbInfos[count]->cache = *((int *)row[TSDB_SHOW_DB_CACHE_INDEX]);
dbInfos[count]->ablocks = *((int *)row[TSDB_SHOW_DB_ABLOCKS_INDEX]);
dbInfos[count]->tblocks = (int)(*((int16_t *)row[TSDB_SHOW_DB_TBLOCKS_INDEX]));
dbInfos[count]->ctime = *((int *)row[TSDB_SHOW_DB_CTIME_INDEX]);
dbInfos[count]->clog = (int)(*((int8_t *)row[TSDB_SHOW_DB_CLOG_INDEX]));
dbInfos[count]->comp = (int)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
#if 0
if (arguments->with_property) {
dbInfos[count]->tables = *((int32_t *)row[TSDB_SHOW_DB_NTABLES_INDEX]);
dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]);
dbInfos[count]->replications = *((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]);
dbInfos[count]->quorum = *((int16_t *)row[TSDB_SHOW_DB_QUORUM_INDEX]);
dbInfos[count]->daysPerFile = *((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]);
dbInfos[count]->daysToKeep = *((int16_t *)row[TSDB_SHOW_DB_KEEP_INDEX]);
dbInfos[count]->daysToKeep1;
dbInfos[count]->daysToKeep2;
dbInfos[count]->cacheBlockSize = *((int32_t *)row[TSDB_SHOW_DB_CACHE_INDEX]);
dbInfos[count]->totalBlocks = *((int32_t *)row[TSDB_SHOW_DB_BLOCKS_INDEX]);
dbInfos[count]->minRowsPerFileBlock = *((int32_t *)row[TSDB_SHOW_DB_MINROWS_INDEX]);
dbInfos[count]->maxRowsPerFileBlock = *((int32_t *)row[TSDB_SHOW_DB_MAXROWS_INDEX]);
dbInfos[count]->walLevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]);
dbInfos[count]->fsyncPeriod = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]);
dbInfos[count]->compression = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
dbInfos[count]->precision = *((int8_t *)row[TSDB_SHOW_DB_PRECISION_INDEX]);
dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]);
}
#endif
count++;
if (arguments->databases) {
......@@ -1037,10 +1069,13 @@ void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s", dbInfo->name);
if (isDumpProperty) {
#if 0
pstr += sprintf(pstr,
" REPLICA %d DAYS %d KEEP %d TABLES %d ROWS %d CACHE %d ABLOCKS %d TBLOCKS %d CTIME %d CLOG %d COMP %d",
dbInfo->replica, dbInfo->days, dbInfo->keep, dbInfo->tables, dbInfo->rows, dbInfo->cache,
dbInfo->ablocks, dbInfo->tblocks, dbInfo->ctime, dbInfo->clog, dbInfo->comp);
"TABLES %d vgroups %d REPLICA %d quorum %d DAYS %d KEEP %d CACHE %d BLOCKS %d MINROWS %d MAXROWS %d WALLEVEL %d FYNC %d COMP %d PRECISION %s UPDATE %d",
dbInfo->tables, dbInfo->vgroups, dbInfo->replications, dbInfo->quorum, dbInfo->daysPerFile, dbInfo->daysToKeep, dbInfo->cacheBlockSize,
dbInfo->totalBlocks, dbInfo->minRowsPerFileBlock, dbInfo->maxRowsPerFileBlock, dbInfo->walLevel, dbInfo->fsyncPeriod, dbInfo->compression,
dbInfo->precision, dbInfo->update);
#endif
}
pstr += sprintf(pstr, ";");
......@@ -1459,7 +1494,8 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
return -1;
}
char* tmpBuffer = (char *)calloc(1, COMMAND_SIZE);
int32_t sql_buf_len = arguments->max_sql_len;
char* tmpBuffer = (char *)calloc(1, sql_buf_len + 128);
if (tmpBuffer == NULL) {
fprintf(stderr, "failed to allocate memory\n");
free(tmpCommand);
......@@ -1502,85 +1538,83 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
return -1;
}
char sqlStr[8] = "\0";
if (arguments->mysqlFlag) {
sprintf(sqlStr, "INSERT");
} else {
sprintf(sqlStr, "IMPORT");
}
int rowFlag = 0;
int32_t curr_sqlstr_len = 0;
int32_t total_sqlstr_len = 0;
count = 0;
while ((row = taos_fetch_row(tmpResult)) != NULL) {
pstr = tmpBuffer;
curr_sqlstr_len = 0;
int32_t* length = taos_fetch_lengths(tmpResult); // act len
if (count == 0) {
pstr += sprintf(pstr, "%s INTO %s VALUES (", sqlStr, tbname);
} else {
total_sqlstr_len = 0;
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "INSERT INTO %s VALUES (", tbname);
} else {
if (arguments->mysqlFlag) {
if (0 == rowFlag) {
pstr += sprintf(pstr, "(");
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "(");
rowFlag++;
} else {
pstr += sprintf(pstr, ", (");
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, ", (");
}
} else {
pstr += sprintf(pstr, "(");
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "(");
}
}
for (int col = 0; col < numFields; col++) {
if (col != 0) pstr += sprintf(pstr, ", ");
if (col != 0) curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, ", ");
if (row[col] == NULL) {
pstr += sprintf(pstr, "NULL");
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "NULL");
continue;
}
switch (fields[col].type) {
case TSDB_DATA_TYPE_BOOL:
pstr += sprintf(pstr, "%d", ((((int32_t)(*((char *)row[col]))) == 1) ? 1 : 0));
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%d", ((((int32_t)(*((char *)row[col]))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
pstr += sprintf(pstr, "%d", *((int8_t *)row[col]));
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%d", *((int8_t *)row[col]));
break;
case TSDB_DATA_TYPE_SMALLINT:
pstr += sprintf(pstr, "%d", *((int16_t *)row[col]));
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%d", *((int16_t *)row[col]));
break;
case TSDB_DATA_TYPE_INT:
pstr += sprintf(pstr, "%d", *((int32_t *)row[col]));
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%d", *((int32_t *)row[col]));
break;
case TSDB_DATA_TYPE_BIGINT:
pstr += sprintf(pstr, "%" PRId64 "", *((int64_t *)row[col]));
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%" PRId64 "", *((int64_t *)row[col]));
break;
case TSDB_DATA_TYPE_FLOAT:
pstr += sprintf(pstr, "%f", GET_FLOAT_VAL(row[col]));
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%f", GET_FLOAT_VAL(row[col]));
break;
case TSDB_DATA_TYPE_DOUBLE:
pstr += sprintf(pstr, "%f", GET_DOUBLE_VAL(row[col]));
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%f", GET_DOUBLE_VAL(row[col]));
break;
case TSDB_DATA_TYPE_BINARY:
*(pstr++) = '\'';
//*(pstr++) = '\'';
converStringToReadable((char *)row[col], length[col], tbuf, COMMAND_SIZE);
pstr = stpcpy(pstr, tbuf);
*(pstr++) = '\'';
//pstr = stpcpy(pstr, tbuf);
//*(pstr++) = '\'';
pstr += sprintf(pstr + curr_sqlstr_len, "\'%s\'", tbuf);
break;
case TSDB_DATA_TYPE_NCHAR:
convertNCharToReadable((char *)row[col], length[col], tbuf, COMMAND_SIZE);
pstr += sprintf(pstr, "\'%s\'", tbuf);
pstr += sprintf(pstr + curr_sqlstr_len, "\'%s\'", tbuf);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (!arguments->mysqlFlag) {
pstr += sprintf(pstr, "%" PRId64 "", *(int64_t *)row[col]);
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%" PRId64 "", *(int64_t *)row[col]);
} else {
char buf[64] = "\0";
int64_t ts = *((int64_t *)row[col]);
time_t tt = (time_t)(ts / 1000);
struct tm *ptm = localtime(&tt);
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
pstr += sprintf(pstr, "\'%s.%03d\'", buf, (int)(ts % 1000));
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "\'%s.%03d\'", buf, (int)(ts % 1000));
}
break;
default:
......@@ -1588,13 +1622,15 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
}
}
pstr += sprintf(pstr, ") ");
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, ") ");
totalRows++;
count++;
fprintf(fp, "%s", tmpBuffer);
if (count >= arguments->data_batch) {
total_sqlstr_len += curr_sqlstr_len;
if ((count >= arguments->data_batch) || (sql_buf_len - total_sqlstr_len < TSDB_MAX_BYTES_PER_ROW)) {
fprintf(fp, ";\n");
count = 0;
} //else {
......
......@@ -37,7 +37,7 @@ extern "C" {
#endif
#ifndef TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME -1
#define TAOS_EPOLL_WAIT_TIME 500
#endif
#ifdef TAOS_RANDOM_NETWORK_FAIL
......
......@@ -111,6 +111,9 @@ void taosUninitTimer() {
pthread_sigmask(SIG_BLOCK, &set, NULL);
*/
void taosMsleep(int mseconds) {
#if 1
usleep(mseconds * 1000);
#else
struct timeval timeout;
int seconds, useconds;
......@@ -126,7 +129,8 @@ void taosMsleep(int mseconds) {
select(0, NULL, NULL, NULL, &timeout);
/* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */
/* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */
#endif
}
#endif
\ No newline at end of file
......@@ -85,7 +85,7 @@ static void httpProcessHttpData(void *param) {
while (1) {
struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1);
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, TAOS_EPOLL_WAIT_TIME);
if (pThread->stop) {
httpDebug("%p, http thread get stop event, exiting...", pThread);
break;
......
......@@ -148,10 +148,12 @@ static void *monitorThreadFunc(void *param) {
}
if (tsMonitor.state == MON_STATE_NOT_INIT) {
int code = 0;
for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) {
monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int code = taos_errno(res);
code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
......@@ -162,7 +164,7 @@ static void *monitorThreadFunc(void *param) {
}
}
if (tsMonitor.start) {
if (tsMonitor.start && code == 0) {
tsMonitor.state = MON_STATE_INITED;
}
}
......
......@@ -43,7 +43,8 @@ typedef struct SHistogramInfo {
int32_t numOfElems;
int32_t numOfEntries;
int32_t maxEntries;
double min;
double max;
#if defined(USE_ARRAYLIST)
SHistBin* elems;
#else
......@@ -52,9 +53,6 @@ typedef struct SHistogramInfo {
int32_t maxIndex;
bool ordered;
#endif
double min;
double max;
} SHistogramInfo;
SHistogramInfo* tHistogramCreate(int32_t numOfBins);
......
......@@ -171,40 +171,17 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
}
static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true;
eventfd_t fd = -1;
// save thread into local variable since pThreadObj is freed when thread exits
// save thread into local variable and signal thread to stop
pthread_t thread = pThreadObj->thread;
if (taosComparePthread(pThreadObj->thread, pthread_self())) {
pthread_detach(pthread_self());
if (!taosCheckPthreadValid(thread)) {
return;
}
if (taosCheckPthreadValid(pThreadObj->thread)) {
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
fd = eventfd(1, 0);
if (fd == -1) {
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption:
tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno));
pThreadObj->stop = true;
pthread_cancel(pThreadObj->thread);
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption:
tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno));
pthread_cancel(pThreadObj->thread);
}
}
// at this step, pThreadObj has already been released
if (taosCheckPthreadValid(thread)) {
pthread_join(thread, NULL);
pThreadObj->stop = true;
if (taosComparePthread(thread, pthread_self())) {
pthread_detach(pthread_self());
return;
}
if (fd != -1) taosCloseSocket(fd);
pthread_join(thread, NULL);
}
void taosStopTcpServer(void *handle) {
......
......@@ -36,6 +36,7 @@ extern "C" {
#define TAOS_SMSG_STATUS 7
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
......@@ -105,7 +106,7 @@ typedef struct {
int8_t nacks;
int8_t confirmed;
int32_t code;
uint64_t time;
int64_t time;
} SFwdInfo;
typedef struct {
......
......@@ -179,6 +179,13 @@ int64_t syncStart(const SSyncInfo *pInfo) {
for (int32_t i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
if (pNode->peerInfo[i] == NULL) {
sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId, pNodeInfo->nodeFqdn,
pNodeInfo->nodePort);
syncStop(pNode->rid);
exit(1);
}
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
pNode->selfIndex = i;
}
......@@ -476,7 +483,11 @@ static void syncRemovePeer(SSyncPeer *pPeer) {
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL;
if (ip == 0xFFFFFFFF) {
sError("failed to add peer, can resolve fqdn:%s since %s", pInfo->nodeFqdn, strerror(errno));
terrno = TSDB_CODE_RPC_FQDN_ERROR;
return NULL;
}
SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL;
......@@ -1193,14 +1204,17 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
if (pSyncFwds) {;
uint64_t time = taosGetTimestampMs();
if (pSyncFwds) {
int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex));
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break;
if (ABS(time - pFwdInfo->time) < 2000) break;
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
pFwdInfo->version, time, pFwdInfo->time);
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
......
......@@ -136,7 +136,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
int32_t ret, code = -1;
void *buffer = calloc(1024000, 1); // size for one record
void *buffer = calloc(SYNC_MAX_SIZE, 1); // size for one record
if (buffer == NULL) return -1;
SWalHead *pHead = (SWalHead *)buffer;
......@@ -237,7 +237,7 @@ static int32_t syncOpenRecvBuffer(SSyncNode *pNode) {
SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
if (pRecv == NULL) return -1;
pRecv->bufferSize = 5000000;
pRecv->bufferSize = SYNC_RECV_BUFFER_SIZE;
pRecv->buffer = malloc(pRecv->bufferSize);
if (pRecv->buffer == NULL) {
free(pRecv);
......
......@@ -301,31 +301,14 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
}
static void taosStopPoolThread(SThreadObj *pThread) {
pthread_t thread = pThread->thread;
if (!taosCheckPthreadValid(thread)) {
return;
}
pThread->stop = true;
if (pThread->thread == pthread_self()) {
if (taosComparePthread(thread, pthread_self())) {
pthread_detach(pthread_self());
return;
}
// save thread ID into a local variable, since pThread is freed when the thread exits
pthread_t thread = pThread->thread;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = {.events = EPOLLIN};
eventfd_t fd = eventfd(1, 0);
if (fd == -1) {
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption
sError("failed to create eventfd since %s", strerror(errno));
pthread_cancel(pThread->thread);
pThread->stop = true;
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption
sError("failed to call epoll_ctl since %s", strerror(errno));
pthread_cancel(pThread->thread);
}
pthread_join(thread, NULL);
if (fd >= 0) taosClose(fd);
}
......@@ -235,6 +235,7 @@ typedef struct {
sem_t readyToCommit;
pthread_mutex_t mutex;
bool repoLocked;
int32_t code; // Commit code
} STsdbRepo;
// ------------------ tsdbRWHelper.c
......
此差异已折叠。
......@@ -256,7 +256,8 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
pthread_rwlock_unlock(&pFileH->fhlock);
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
ASSERT(pGroup != NULL);
}
return pGroup;
......
......@@ -134,17 +134,20 @@ _err:
}
// Note: all working thread and query thread must stopped when calling this function
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
if (repo == NULL) return;
int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
if (repo == NULL) return 0;
STsdbRepo *pRepo = (STsdbRepo *)repo;
int vgId = REPO_ID(pRepo);
terrno = TSDB_CODE_SUCCESS;
tsdbStopStream(pRepo);
if (toCommit) {
tsdbAsyncCommit(pRepo);
sem_wait(&(pRepo->readyToCommit));
terrno = pRepo->code;
}
tsdbUnRefMemTable(pRepo, pRepo->mem);
tsdbUnRefMemTable(pRepo, pRepo->imem);
......@@ -156,6 +159,12 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
tsdbCloseMeta(pRepo);
tsdbFreeRepo(pRepo);
tsdbDebug("vgId:%d repository is closed", vgId);
if (terrno != TSDB_CODE_SUCCESS) {
return -1;
} else {
return 0;
}
}
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
......@@ -619,6 +628,7 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
}
pRepo->state = TSDB_STATE_OK;
pRepo->code = TSDB_CODE_SUCCESS;
int code = pthread_mutex_init(&pRepo->mutex, NULL);
if (code != 0) {
......
此差异已折叠。
......@@ -236,6 +236,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
rInfo.offset = lseek(pStore->fd, 0, SEEK_CUR);
if (rInfo.offset < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -254,6 +255,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
if (taosWrite(pStore->fd, cont, contLen) < contLen) {
uError("failed to write %d bytes to file %s since %s", contLen, pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......
......@@ -145,6 +145,7 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
// forward to put the rest of data
for (int idata = 1; idata < ndata; idata++) {
pDataKey = pSkipList->keyFn(ppData[idata]);
hasDup = false;
// Compare max key
pKey = SL_GET_MAX_KEY(pSkipList);
......@@ -153,8 +154,6 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
}
hasDup = false;
} else {
SSkipListNode *px = pSkipList->pHead;
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
......@@ -173,7 +172,7 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
compare = pSkipList->comparFn(pKey, pDataKey);
if (compare >= 0) {
if (compare == 0) hasDup = true;
if (compare == 0 && !hasDup) hasDup = true;
break;
} else {
px = p;
......
......@@ -58,13 +58,13 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
} else {
#ifdef EAI_SYSTEM
if (ret == EAI_SYSTEM) {
uError("failed to get the ip address, fqdn:%s, code:%d, reason:%s", fqdn, ret, strerror(errno));
uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
uError("failed to get the ip address, fqdn:%s, code:%d, reason:%s", fqdn, ret, gai_strerror(ret));
uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
}
#else
uError("failed to get the ip address, fqdn:%s, code:%d, reason:%s", fqdn, ret, gai_strerror(ret));
uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
#endif
return 0xFFFFFFFF;
}
......
......@@ -37,10 +37,13 @@ extern int32_t vDebugFlag;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
int32_t delay;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t delayMs;
int8_t status;
int8_t role;
int8_t accessState;
int8_t isFull;
uint64_t version; // current version
uint64_t fversion; // version on saved data file
void *wqueue;
......@@ -58,7 +61,7 @@ typedef struct {
char *rootDir;
tsem_t sem;
int8_t dropped;
char db[TSDB_DB_NAME_LEN];
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
} SVnodeObj;
void vnodeInitWriteFp(void);
......
......@@ -25,7 +25,7 @@
#include "vnodeCfg.h"
static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
strcpy(pVnode->db, vnodeMsg->db);
tstrncpy(pVnode->db, vnodeMsg->db, sizeof(pVnode->db));
pVnode->cfgVersion = vnodeMsg->cfg.cfgVersion;
pVnode->tsdbCfg.cacheBlockSize = vnodeMsg->cfg.cacheBlockSize;
pVnode->tsdbCfg.totalBlocks = vnodeMsg->cfg.totalBlocks;
......@@ -97,7 +97,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
vError("vgId:%d, failed to read %s, db not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
strcpy(vnodeMsg.db, db->valuestring);
tstrncpy(vnodeMsg.db, db->valuestring, sizeof(vnodeMsg.db));
cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
if (!cfgVersion || cfgVersion->type != cJSON_Number) {
......
......@@ -30,7 +30,7 @@
static SHashObj*tsVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status);
static int vnodeProcessTsdbStatus(void *arg, int status, int eno);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(void *ahandle, int8_t role);
......@@ -378,9 +378,10 @@ int32_t vnodeClose(int32_t vgId) {
return 0;
}
void vnodeRelease(void *pVnodeRaw) {
if (pVnodeRaw == NULL) return;
SVnodeObj *pVnode = pVnodeRaw;
void vnodeRelease(void *vparam) {
if (vparam == NULL) return;
SVnodeObj *pVnode = vparam;
int32_t code = 0;
int32_t vgId = pVnode->vgId;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
......@@ -406,7 +407,7 @@ void vnodeRelease(void *pVnodeRaw) {
}
if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb, 1);
code = tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL;
}
......@@ -418,7 +419,11 @@ void vnodeRelease(void *pVnodeRaw) {
}
if (pVnode->wal) {
walRemoveAllOldFiles(pVnode->wal);
if (code != 0) {
vError("vgId:%d, failed to commit while close tsdb repo, keep wal", pVnode->vgId);
} else {
walRemoveAllOldFiles(pVnode->wal);
}
walClose(pVnode->wal);
pVnode->wal = NULL;
}
......@@ -590,9 +595,16 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
}
// TODO: this is a simple implement
static int vnodeProcessTsdbStatus(void *arg, int status) {
static int vnodeProcessTsdbStatus(void *arg, int status, int eno) {
SVnodeObj *pVnode = arg;
if (eno != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to commit since %s, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, tstrerror(eno),
pVnode->fversion, pVnode->version);
pVnode->isFull = 1;
return 0;
}
if (status == TSDB_STATUS_COMMIT_START) {
pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
......@@ -605,6 +617,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
pVnode->isFull = 0;
walRemoveOneOldFile(pVnode->wal);
return vnodeSaveVersion(pVnode);
}
......@@ -630,18 +643,19 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
pVnode->role = role;
dnodeSendStatusMsgToMnode();
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) {
cqStart(pVnode->cq);
else
} else {
cqStop(pVnode->cq);
}
}
static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) {
SVnodeObj *pVnode = ahandle;
if (pVnode->delay != mseconds) {
vInfo("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
if (pVnode->delayMs != mseconds) {
pVnode->delayMs = mseconds;
vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
}
pVnode->delay = mseconds;
}
static int vnodeResetTsdb(SVnodeObj *pVnode) {
......
......@@ -41,8 +41,8 @@ void vnodeInitReadFp(void) {
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
// request enters the queue
//
int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int32_t vnodeProcessRead(void *vparam, SVReadMsg *pRead) {
SVnodeObj *pVnode = vparam;
int32_t msgType = pRead->msgType;
if (vnodeProcessReadMsgFp[msgType] == NULL) {
......@@ -53,8 +53,8 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pRead);
}
static int32_t vnodeCheckRead(void *param) {
SVnodeObj *pVnode = param;
static int32_t vnodeCheckRead(void *vparam) {
SVnodeObj *pVnode = vparam;
if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode);
......@@ -76,6 +76,16 @@ static int32_t vnodeCheckRead(void *param) {
return TSDB_CODE_SUCCESS;
}
void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) {
SVnodeObj *pVnode = vparam;
atomic_sub_fetch_32(&pVnode->queuedRMsg, 1);
vTrace("vgId:%d, free from vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
taosFreeQitem(pRead);
vnodeRelease(pVnode);
}
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam;
......@@ -108,7 +118,8 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
pRead->qtype = qtype;
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode rqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
taosWriteQitem(pVnode->rqueue, qtype, pRead);
return TSDB_CODE_SUCCESS;
......
此差异已折叠。
......@@ -37,7 +37,7 @@ extern int32_t wDebugFlag;
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_FILE_NUM 3
typedef struct {
......
......@@ -132,7 +132,7 @@ https://www.taosdata.com/cn/all-downloads/
配置完成后,在命令行内使用taos shell连接server端
```shell
C:\TDengine>taos
C:\TDengine>taos -h td01
Welcome to the TDengine shell from Linux, Client Version:2.0.1.1
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
......
此差异已折叠。
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar
此差异已折叠。
此差异已折叠。
此差异已折叠。
package com.taosdata.example.mybatisplusdemo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("com.taosdata.example.mybatisplusdemo.mapper")
public class MybatisplusDemoApplication {
public static void main(String[] args) {
SpringApplication.run(MybatisplusDemoApplication.class, args);
}
}
\ No newline at end of file
package com.taosdata.example.mybatisplusdemo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.taosdata.example.mybatisplusdemo.domain.Weather;
public interface WeatherMapper extends BaseMapper<Weather> {
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册