提交 9c41f5cd 编写于 作者: L liuyq-617

Merge branch 'feature/query' of github.com:taosdata/TDengine into feature/query

......@@ -4,6 +4,9 @@
[submodule "src/connector/grafanaplugin"]
path = src/connector/grafanaplugin
url = https://github.com/taosdata/grafanaplugin
[submodule "tests/examples/rust"]
path = tests/examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git
[submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension
url = https://github.com/huskar-t/hivemq-tdengine-extension.git
url = https://github.com/huskar-t/hivemq-tdengine-extension.git
\ No newline at end of file
......@@ -61,7 +61,7 @@ The use of each configuration item is:
* **port**: This is the `http` service port which enables other application to manage rules by `restful API`.
* **database**: rules are stored in a `sqlite` database, this is the path of the database file (if the file does not exist, the alert application creates it automatically).
* **tdengine**: connection string of `TDEngine` server, note the database name should be put in the `sql` field of a rule in most cases, thus it should NOT be included in the string.
* **tdengine**: connection string of `TDEngine` server (please refer the documentation of GO connector for the detailed format of this string), note the database name should be put in the `sql` field of a rule in most cases, thus it should NOT be included in the string.
* **log > level**: log level, could be `production` or `debug`.
* **log > path**: log output file path.
* **receivers > alertManager**: the alert application pushes alerts to `AlertManager` at this URL.
......
......@@ -58,7 +58,7 @@ $ go build
* **port**:报警监测程序支持使用 `restful API` 对规则进行管理,这个参数用于配置 `http` 服务的侦听端口。
* **database**:报警监测程序将规则保存到了一个 `sqlite` 数据库中,这个参数用于指定数据库文件的路径(不需要提前创建这个文件,如果它不存在,程序会自动创建它)。
* **tdengine**`TDEngine` 的连接字符串,一般来说,数据库名应该在报警规则的 `sql` 语句中指定,所以这个字符串中 **不** 应包含数据库名。
* **tdengine**`TDEngine` 的连接字符串(这个字符串的详细格式说明请见 GO 连接器的文档),一般来说,数据库名应该在报警规则的 `sql` 语句中指定,所以这个字符串中 **不** 应包含数据库名。
* **log > level**:日志的记录级别,可选 `production``debug`
* **log > path**:日志文件的路径。
* **receivers > alertManager**:报警监测程序会将报警推送到 `AlertManager`,在这里指定 `AlertManager` 的接收地址。
......
......@@ -84,6 +84,7 @@ func (alert *Alert) doRefresh(firing bool, rule *Rule) bool {
case firing && (alert.State == AlertStateWaiting):
alert.StartsAt = time.Now()
alert.EndsAt = time.Time{}
if rule.For.Nanoseconds() > 0 {
alert.State = AlertStatePending
return false
......@@ -95,6 +96,7 @@ func (alert *Alert) doRefresh(firing bool, rule *Rule) bool {
return false
}
alert.StartsAt = alert.StartsAt.Add(rule.For.Duration)
alert.EndsAt = time.Time{}
alert.State = AlertStateFiring
case firing && (alert.State == AlertStateFiring):
......
......@@ -84,9 +84,9 @@ typedef struct SRetrieveSupport {
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
SColumnModel **pFinalModel, uint32_t nBufferSize);
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel,
int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
......
......@@ -42,7 +42,6 @@ void tscBuildResFromSubqueries(SSqlObj *pSql);
TAOS_ROW doSetResultRowData(SSqlObj *pSql);
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId);
void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
#ifdef __cplusplus
}
......
......@@ -282,6 +282,7 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -2695,17 +2695,18 @@ static void apercentile_func_second_merge(SQLFunctionCtx *pCtx) {
}
SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
SHistogramInfo * pHisto = pOutput->pHisto;
SHistogramInfo *pHisto = pOutput->pHisto;
if (pHisto->numOfElems <= 0) {
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
} else {
//TODO(dengyihao): avoid memcpy
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
tHistogramDestroy(&pOutput->pHisto);
pOutput->pHisto = pRes;
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
tHistogramDestroy(&pRes);
}
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
......
......@@ -618,7 +618,11 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
for (int32_t i = 0; i < numOfRows; ++i) {
uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes);
int32_t bytes = pSchema[i].bytes - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name, tDataTypeDesc[pSchema[i].type].aName, bytes);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[pSchema[i].type].aName);
}
......@@ -641,7 +645,11 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
for (int32_t i = 0; i < numOfRows; ++i) {
uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result),"%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes);
int32_t bytes = pSchema[i].bytes - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result),"%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName, bytes);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName);
}
......@@ -651,7 +659,11 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
for (int32_t i = numOfRows; i < totalRows; i++) {
uint8_t type = pSchema[i].type;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName,pSchema->bytes);
int32_t bytes = pSchema[i].bytes - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s(%d),", pSchema[i].name,tDataTypeDesc[pSchema[i].type].aName, bytes);
} else {
snprintf(result + strlen(result), TSDB_MAX_BINARY_LEN - strlen(result), "%s %s,", pSchema[i].name, tDataTypeDesc[type].aName);
}
......
......@@ -172,14 +172,14 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
SSqlRes* pRes = &pSql->res;
if (pMemBuffer == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("%p pMemBuffer is NULL", pMemBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
}
if (pDesc->pColumnModel == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscError("%p no local buffer or intermediate result format model", pSql);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
......@@ -197,7 +197,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
}
if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
tscDebug("%p retrieved no data", pSql);
return;
}
......@@ -206,7 +206,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity,
pMemBuffer[0]->pageSize);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_APP_ERROR;
return;
}
......@@ -217,7 +217,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if (pReducer == NULL) {
tscError("%p failed to create local merge structure, out of memory", pSql);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return;
}
......@@ -334,6 +334,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize;
pReducer->finalModel = pFFModel;
assert(pReducer->finalRowSize > 0);
if (pReducer->finalRowSize > 0) {
pReducer->resColModel->capacity /= pReducer->finalRowSize;
......@@ -531,7 +533,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tfree(pLocalReducer->pFinalRes);
tfree(pLocalReducer->discardData);
tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel,
tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel, pLocalReducer->finalModel,
pLocalReducer->numOfVnode);
for (int32_t i = 0; i < pLocalReducer->numOfBuffer; ++i) {
tfree(pLocalReducer->pLocalDataSrc[i]);
......@@ -655,7 +657,7 @@ bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage
}
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
SColumnModel **pFinalModel, uint32_t nBufferSizes) {
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -753,6 +755,18 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
*pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
memset(pSchema, 0, sizeof(SSchema) * size);
size = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < size; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
pSchema[i].bytes = pField->field.bytes;
pSchema[i].type = pField->field.type;
tstrncpy(pSchema[i].name, pField->field.name, tListLen(pSchema[i].name));
}
*pFFModel = createColumnModel(pSchema, (int32_t) size, capacity);
tfree(pSchema);
return TSDB_CODE_SUCCESS;
}
......@@ -763,9 +777,11 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
* @param pFinalModel
* @param numOfVnodes
*/
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel *pFFModel,
int32_t numOfVnodes) {
destroyColumnModel(pFinalModel);
destroyColumnModel(pFFModel);
tOrderDescDestroy(pDesc);
for (int32_t i = 0; i < numOfVnodes; ++i) {
......@@ -873,10 +889,10 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t)pBeforeFillData->num;
tColModelErase(pLocalReducer->resColModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
tColModelErase(pLocalReducer->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
/* remove the hole in column model */
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
pRes->numOfRows -= pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
......@@ -898,7 +914,7 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
pRes->numOfRows -= overflow;
pBeforeFillData->num -= overflow;
tColModelCompact(pLocalReducer->resColModel, pBeforeFillData, prevSize);
tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
// set remain data to be discarded, and reset the interpolation information
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
......@@ -1240,7 +1256,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) {
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize);
pLocalReducer->finalRowSize = doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize);
}
#ifdef _DEBUG_VIEW
......@@ -1610,7 +1626,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->data = pRes->pLocalReducer->pResultBuf->data;
}
void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
char* pbuf = calloc(1, pOutput->num * rowSize);
size_t size = tscNumOfFields(pQueryInfo);
......@@ -1645,8 +1661,10 @@ void doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t r
}
assert(finalRowSize <= rowSize);
memcpy(pOutput->data, pbuf, pOutput->num * finalRowSize);
memcpy(pOutput->data, pbuf, pOutput->num * offset);
tfree(pbuf);
tfree(arithSup.data);
return offset;
}
\ No newline at end of file
......@@ -1148,6 +1148,10 @@ int tsParseInsertSql(SSqlObj *pSql) {
index = 0;
sToken = tStrGetToken(str, &index, false, 0, NULL);
if (sToken.type != TK_STRING && sToken.type != TK_ID) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
goto _error;
}
str += index;
if (sToken.n == 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z);
......
......@@ -547,7 +547,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs);
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2);
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
......@@ -787,8 +787,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
}
if(tscIsSecondStageQuery(pQueryInfo)) {
size_t output = tscNumOfFields(pQueryInfo);
size_t output = tscNumOfFields(pQueryInfo);
if ((tscIsSecondStageQuery(pQueryInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) ||
UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) && (output != tscSqlExprNumOfExprs(pQueryInfo))) {
pQueryMsg->secondStageOutput = htonl((int32_t) output);
SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
......
......@@ -1644,6 +1644,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tExtMemBuffer ** pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
SColumnModel *pModel = NULL;
SColumnModel *pFinalModel = NULL;
pRes->qhandle = 0x1; // hack the qhandle check
......@@ -1662,7 +1663,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
if (ret != 0) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
......@@ -1677,7 +1678,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
if (pSql->pSubs == NULL) {
tfree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
tscQueueAsyncRes(pSql);
return ret;
......@@ -1707,6 +1708,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex = i;
trs->pParentSql = pSql;
trs->pFinalColModel = pModel;
trs->pFFColModel = pFinalModel;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) {
......@@ -1730,13 +1732,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pState->numOfSub);
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code;
}
......@@ -1876,7 +1878,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tstrerror(pParentSql->res.code));
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, trsupport->pFFColModel,
pState->numOfSub);
tscFreeRetrieveSup(pSql);
......
......@@ -56,6 +56,23 @@
<scope>test</scope>
</dependency>
<!-- for restful -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.8</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>
<build>
<plugins>
......
package com.taosdata.jdbc;
import java.io.*;
import java.sql.Driver;
import java.sql.DriverPropertyInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;
public abstract class AbstractTaosDriver implements Driver {
private static final String TAOS_CFG_FILENAME = "taos.cfg";
/**
* @param cfgDirPath
* @return return the config dir
**/
protected File loadConfigDir(String cfgDirPath) {
if (cfgDirPath == null)
return loadDefaultConfigDir();
File cfgDir = new File(cfgDirPath);
if (!cfgDir.exists())
return loadDefaultConfigDir();
return cfgDir;
}
/**
* @return search the default config dir, if the config dir is not exist will return null
*/
protected File loadDefaultConfigDir() {
File cfgDir;
File cfgDir_linux = new File("/etc/taos");
cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
File cfgDir_windows = new File("C:\\TDengine\\cfg");
cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
return cfgDir;
}
protected List<String> loadConfigEndpoints(File cfgFile) {
List<String> endpoints = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
endpoints.add(line.substring(line.indexOf('p') + 1).trim());
}
if (endpoints.size() > 1)
break;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return endpoints;
}
protected void loadTaosConfig(Properties info) {
if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> TAOS_CFG_FILENAME.equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile);
if (!endpoints.isEmpty()) {
info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
}
}
}
protected DriverPropertyInfo[] getPropertyInfo(Properties info) {
DriverPropertyInfo hostProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_HOST, info.getProperty(TSDBDriver.PROPERTY_KEY_HOST));
hostProp.required = false;
hostProp.description = "Hostname";
DriverPropertyInfo portProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_PORT, info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false;
portProp.description = "Port";
DriverPropertyInfo dbProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_DBNAME, info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME));
dbProp.required = false;
dbProp.description = "Database name";
DriverPropertyInfo userProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_USER, info.getProperty(TSDBDriver.PROPERTY_KEY_USER));
userProp.required = true;
userProp.description = "User";
DriverPropertyInfo passwordProp = new DriverPropertyInfo(TSDBDriver.PROPERTY_KEY_PASSWORD, info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
passwordProp.required = true;
passwordProp.description = "Password";
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp;
propertyInfo[1] = portProp;
propertyInfo[2] = dbProp;
propertyInfo[3] = userProp;
propertyInfo[4] = passwordProp;
return propertyInfo;
}
protected Properties parseURL(String url, Properties defaults) {
Properties urlProps = (defaults != null) ? defaults : new Properties();
// parse properties
int beginningOfSlashes = url.indexOf("//");
int index = url.indexOf("?");
if (index != -1) {
String paramString = url.substring(index + 1, url.length());
url = url.substring(0, index);
StringTokenizer queryParams = new StringTokenizer(paramString, "&");
while (queryParams.hasMoreElements()) {
String parameterValuePair = queryParams.nextToken();
int indexOfEqual = parameterValuePair.indexOf("=");
String parameter = null;
String value = null;
if (indexOfEqual != -1) {
parameter = parameterValuePair.substring(0, indexOfEqual);
if (indexOfEqual + 1 < parameterValuePair.length()) {
value = parameterValuePair.substring(indexOfEqual + 1);
}
}
if ((value != null && value.length() > 0) && (parameter != null && parameter.length() > 0)) {
urlProps.setProperty(parameter, value);
}
}
}
// parse Product Name
String dbProductName = url.substring(0, beginningOfSlashes);
dbProductName = dbProductName.substring(dbProductName.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
// parse dbname
url = url.substring(beginningOfSlashes + 2);
int indexOfSlash = url.indexOf("/");
if (indexOfSlash != -1) {
if (indexOfSlash + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_DBNAME, url.substring(indexOfSlash + 1));
}
url = url.substring(0, indexOfSlash);
}
// parse port
int indexOfColon = url.indexOf(":");
if (indexOfColon != -1) {
if (indexOfColon + 1 < url.length()) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_PORT, url.substring(indexOfColon + 1));
}
url = url.substring(0, indexOfColon);
}
// parse host
if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
}
return urlProps;
}
}
......@@ -16,10 +16,10 @@ package com.taosdata.jdbc;
public class ColumnMetaData {
int colType = 0;
String colName = null;
int colSize = -1;
int colIndex = 0;
private int colType = 0;
private String colName = null;
private int colSize = -1;
private int colIndex = 0;
public int getColSize() {
return colSize;
......
......@@ -14,7 +14,6 @@
*****************************************************************************/
package com.taosdata.jdbc;
import java.io.*;
import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
......@@ -35,11 +34,10 @@ import java.util.*;
import java.util.concurrent.Executor;
public class TSDBConnection implements Connection {
protected Properties props = null;
private TSDBJNIConnector connector = null;
protected Properties props = null;
private String catalog = null;
private TSDBDatabaseMetaData dbMetaData = null;
......@@ -47,15 +45,21 @@ public class TSDBConnection implements Connection {
private Properties clientInfoProps = new Properties();
private int timeoutMilliseconds = 0;
private String tsCharSet = "";
private boolean batchFetch = false;
public TSDBConnection(Properties info, TSDBDatabaseMetaData meta) throws SQLException {
this.dbMetaData = meta;
connect(info.getProperty(TSDBDriver.PROPERTY_KEY_HOST),
Integer.parseInt(info.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "0")),
info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME), info.getProperty(TSDBDriver.PROPERTY_KEY_USER),
info.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME),
info.getProperty(TSDBDriver.PROPERTY_KEY_USER),
info.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
String batchLoad = info.getProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD);
if (batchLoad != null) {
this.batchFetch = Boolean.parseBoolean(batchLoad);
}
}
private void connect(String host, int port, String dbName, String user, String password) throws SQLException {
......@@ -197,7 +201,8 @@ public class TSDBConnection implements Connection {
public SQLWarning getWarnings() throws SQLException {
//todo: implement getWarnings according to the warning messages returned from TDengine
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return null;
// throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
public void clearWarnings() throws SQLException {
......@@ -222,6 +227,14 @@ public class TSDBConnection implements Connection {
return this.prepareStatement(sql);
}
public Boolean getBatchFetch() {
return this.batchFetch;
}
public void setBatchFetch(Boolean batchFetch) {
this.batchFetch = batchFetch;
}
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
......
......@@ -14,7 +14,6 @@
*****************************************************************************/
package com.taosdata.jdbc;
import java.io.*;
import java.sql.*;
import java.util.*;
import java.util.logging.Logger;
......@@ -38,7 +37,7 @@ import java.util.logging.Logger;
* register it with the DriverManager. This means that a user can load and
* register a driver by doing Class.forName("foo.bah.Driver")
*/
public class TSDBDriver implements java.sql.Driver {
public class TSDBDriver extends AbstractTaosDriver {
@Deprecated
private static final String URL_PREFIX1 = "jdbc:TSDB://";
......@@ -87,6 +86,11 @@ public class TSDBDriver implements java.sql.Driver {
*/
public static final String PROPERTY_KEY_CHARSET = "charset";
/**
* fetch data from native function in a batch model
*/
public static final String PROPERTY_KEY_BATCH_LOAD = "batch";
private TSDBDatabaseMetaData dbMetaData = null;
static {
......@@ -97,50 +101,6 @@ public class TSDBDriver implements java.sql.Driver {
}
}
private List<String> loadConfigEndpoints(File cfgFile) {
List<String> endpoints = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(cfgFile))) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.trim().startsWith("firstEp") || line.trim().startsWith("secondEp")) {
endpoints.add(line.substring(line.indexOf('p') + 1).trim());
}
if (endpoints.size() > 1)
break;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return endpoints;
}
/**
* @param cfgDirPath
* @return return the config dir
**/
private File loadConfigDir(String cfgDirPath) {
if (cfgDirPath == null)
return loadDefaultConfigDir();
File cfgDir = new File(cfgDirPath);
if (!cfgDir.exists())
return loadDefaultConfigDir();
return cfgDir;
}
/**
* @return search the default config dir, if the config dir is not exist will return null
*/
private File loadDefaultConfigDir() {
File cfgDir;
File cfgDir_linux = new File("/etc/taos");
cfgDir = cfgDir_linux.exists() ? cfgDir_linux : null;
File cfgDir_windows = new File("C:\\TDengine\\cfg");
cfgDir = (cfgDir == null && cfgDir_windows.exists()) ? cfgDir_windows : cfgDir;
return cfgDir;
}
public Connection connect(String url, Properties info) throws SQLException {
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
......@@ -152,26 +112,12 @@ public class TSDBDriver implements java.sql.Driver {
if ((props = parseURL(url, info)) == null) {
return null;
}
//load taos.cfg start
if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> "taos.cfg".equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile);
if (!endpoints.isEmpty()) {
info.setProperty(TSDBDriver.PROPERTY_KEY_HOST, endpoints.get(0).split(":")[0]);
info.setProperty(TSDBDriver.PROPERTY_KEY_PORT, endpoints.get(0).split(":")[1]);
}
}
loadTaosConfig(info);
try {
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR),
(String) props.get(PROPERTY_KEY_LOCALE),
(String) props.get(PROPERTY_KEY_CHARSET),
(String) props.get(PROPERTY_KEY_TIME_ZONE));
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE),
(String) props.get(PROPERTY_KEY_CHARSET), (String) props.get(PROPERTY_KEY_TIME_ZONE));
Connection newConn = new TSDBConnection(props, this.dbMetaData);
return newConn;
} catch (SQLWarning sqlWarning) {
......@@ -208,39 +154,13 @@ public class TSDBDriver implements java.sql.Driver {
info = parseURL(url, info);
}
DriverPropertyInfo hostProp = new DriverPropertyInfo(PROPERTY_KEY_HOST, info.getProperty(PROPERTY_KEY_HOST));
hostProp.required = false;
hostProp.description = "Hostname";
DriverPropertyInfo portProp = new DriverPropertyInfo(PROPERTY_KEY_PORT, info.getProperty(PROPERTY_KEY_PORT, TSDBConstants.DEFAULT_PORT));
portProp.required = false;
portProp.description = "Port";
DriverPropertyInfo dbProp = new DriverPropertyInfo(PROPERTY_KEY_DBNAME, info.getProperty(PROPERTY_KEY_DBNAME));
dbProp.required = false;
dbProp.description = "Database name";
DriverPropertyInfo userProp = new DriverPropertyInfo(PROPERTY_KEY_USER, info.getProperty(PROPERTY_KEY_USER));
userProp.required = true;
userProp.description = "User";
DriverPropertyInfo passwordProp = new DriverPropertyInfo(PROPERTY_KEY_PASSWORD, info.getProperty(PROPERTY_KEY_PASSWORD));
passwordProp.required = true;
passwordProp.description = "Password";
DriverPropertyInfo[] propertyInfo = new DriverPropertyInfo[5];
propertyInfo[0] = hostProp;
propertyInfo[1] = portProp;
propertyInfo[2] = dbProp;
propertyInfo[3] = userProp;
propertyInfo[4] = passwordProp;
return propertyInfo;
return getPropertyInfo(info);
}
/**
* example: jdbc:TAOS://127.0.0.1:0/db?user=root&password=your_password
*/
@Override
public Properties parseURL(String url, Properties defaults) {
Properties urlProps = (defaults != null) ? defaults : new Properties();
if (url == null || url.length() <= 0 || url.trim().length() <= 0)
......@@ -257,26 +177,21 @@ public class TSDBDriver implements java.sql.Driver {
url = url.substring(0, index);
StringTokenizer queryParams = new StringTokenizer(paramString, "&");
while (queryParams.hasMoreElements()) {
String parameterValuePair = queryParams.nextToken();
int indexOfEqual = parameterValuePair.indexOf("=");
String parameter = null;
String value = null;
if (indexOfEqual != -1) {
parameter = parameterValuePair.substring(0, indexOfEqual);
if (indexOfEqual + 1 < parameterValuePair.length()) {
value = parameterValuePair.substring(indexOfEqual + 1);
}
}
if ((value != null && value.length() > 0) && (parameter != null && parameter.length() > 0)) {
urlProps.setProperty(parameter, value);
String oneToken = queryParams.nextToken();
String[] pair = oneToken.split("=");
if ((pair[0] != null && pair[0].trim().length() > 0) && (pair[1] != null && pair[1].trim().length() > 0)) {
urlProps.setProperty(pair[0].trim(), pair[1].trim());
}
}
}
// parse Product Name
String dbProductName = url.substring(0, beginningOfSlashes);
dbProductName = dbProductName.substring(dbProductName.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
// parse dbname
// parse database name
url = url.substring(beginningOfSlashes + 2);
int indexOfSlash = url.indexOf("/");
if (indexOfSlash != -1) {
......@@ -285,6 +200,7 @@ public class TSDBDriver implements java.sql.Driver {
}
url = url.substring(0, indexOfSlash);
}
// parse port
int indexOfColon = url.indexOf(":");
if (indexOfColon != -1) {
......@@ -293,89 +209,15 @@ public class TSDBDriver implements java.sql.Driver {
}
url = url.substring(0, indexOfColon);
}
if (url != null && url.length() > 0 && url.trim().length() > 0) {
urlProps.setProperty(TSDBDriver.PROPERTY_KEY_HOST, url);
}
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty(TSDBDriver.PROPERTY_KEY_USER));
/*
String urlForMeta = url;
String dbProductName = url.substring(url.indexOf(":") + 1);
dbProductName = dbProductName.substring(0, dbProductName.indexOf(":"));
int beginningOfSlashes = url.indexOf("//");
url = url.substring(beginningOfSlashes + 2);
String host = url.substring(0, url.indexOf(":"));
url = url.substring(url.indexOf(":") + 1);
urlProps.setProperty(PROPERTY_KEY_HOST, host);
String port = url.substring(0, url.indexOf("/"));
urlProps.setProperty(PROPERTY_KEY_PORT, port);
url = url.substring(url.indexOf("/") + 1);
if (url.indexOf("?") != -1) {
String dbName = url.substring(0, url.indexOf("?"));
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
url = url.trim().substring(url.indexOf("?") + 1);
} else {
// without user & password so return
if (!url.trim().isEmpty()) {
String dbName = url.trim();
urlProps.setProperty(PROPERTY_KEY_DBNAME, dbName);
}
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, urlProps.getProperty("user"));
return urlProps;
}
String user = "";
if (url.indexOf("&") == -1) {
String[] kvPair = url.trim().split("=");
if (kvPair.length == 2) {
setPropertyValue(urlProps, kvPair);
return urlProps;
}
}
String[] queryStrings = url.trim().split("&");
for (String queryStr : queryStrings) {
String[] kvPair = queryStr.trim().split("=");
if (kvPair.length < 2) {
continue;
}
setPropertyValue(urlProps, kvPair);
}
user = urlProps.getProperty(PROPERTY_KEY_USER).toString();
this.dbMetaData = new TSDBDatabaseMetaData(dbProductName, urlForMeta, user);
*/
return urlProps;
}
private void setPropertyValue(Properties property, String[] keyValuePair) {
switch (keyValuePair[0].toLowerCase()) {
case PROPERTY_KEY_USER:
property.setProperty(PROPERTY_KEY_USER, keyValuePair[1]);
break;
case PROPERTY_KEY_PASSWORD:
property.setProperty(PROPERTY_KEY_PASSWORD, keyValuePair[1]);
break;
case PROPERTY_KEY_TIME_ZONE:
property.setProperty(PROPERTY_KEY_TIME_ZONE, keyValuePair[1]);
break;
case PROPERTY_KEY_LOCALE:
property.setProperty(PROPERTY_KEY_LOCALE, keyValuePair[1]);
break;
case PROPERTY_KEY_CHARSET:
property.setProperty(PROPERTY_KEY_CHARSET, keyValuePair[1]);
break;
case PROPERTY_KEY_CONFIG_DIR:
property.setProperty(PROPERTY_KEY_CONFIG_DIR, keyValuePair[1]);
break;
}
}
public int getMajorVersion() {
return 2;
}
......
......@@ -49,7 +49,7 @@ public class TSDBResultSet implements ResultSet {
private TSDBResultSetRowData rowData;
private TSDBResultSetBlockData blockData;
private boolean blockwiseFetch = false;
private boolean batchFetch = false;
private boolean lastWasNull = false;
private final int COLUMN_INDEX_START_VALUE = 1;
......@@ -71,8 +71,12 @@ public class TSDBResultSet implements ResultSet {
this.resultSetPointer = resultSetPointer;
}
public void setBlockWiseFetch(boolean fetchBlock) {
this.blockwiseFetch = fetchBlock;
public void setBatchFetch(boolean batchFetch) {
this.batchFetch = batchFetch;
}
public Boolean getBatchFetch() {
return this.batchFetch;
}
public List<ColumnMetaData> getColumnMetaDataList() {
......@@ -102,8 +106,8 @@ public class TSDBResultSet implements ResultSet {
public TSDBResultSet() {
}
public TSDBResultSet(TSDBJNIConnector connecter, long resultSetPointer) throws SQLException {
this.jniConnector = connecter;
public TSDBResultSet(TSDBJNIConnector connector, long resultSetPointer) throws SQLException {
this.jniConnector = connector;
this.resultSetPointer = resultSetPointer;
int code = this.jniConnector.getSchemaMetaData(this.resultSetPointer, this.columnMetaDataList);
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
......@@ -127,13 +131,13 @@ public class TSDBResultSet implements ResultSet {
}
public boolean next() throws SQLException {
if (this.blockwiseFetch) {
if (this.getBatchFetch()) {
if (this.blockData.forward()) {
return true;
}
int code = this.jniConnector.fetchBlock(this.resultSetPointer, this.blockData);
this.blockData.resetCursor();
this.blockData.reset();
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
......@@ -185,7 +189,7 @@ public class TSDBResultSet implements ResultSet {
String res = null;
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
......@@ -200,7 +204,7 @@ public class TSDBResultSet implements ResultSet {
boolean res = false;
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
......@@ -216,7 +220,7 @@ public class TSDBResultSet implements ResultSet {
byte res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
......@@ -231,7 +235,7 @@ public class TSDBResultSet implements ResultSet {
short res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
......@@ -246,7 +250,7 @@ public class TSDBResultSet implements ResultSet {
int res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
......@@ -262,7 +266,7 @@ public class TSDBResultSet implements ResultSet {
long res = 0l;
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
......@@ -277,7 +281,7 @@ public class TSDBResultSet implements ResultSet {
float res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
......@@ -292,7 +296,7 @@ public class TSDBResultSet implements ResultSet {
double res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
......@@ -334,7 +338,7 @@ public class TSDBResultSet implements ResultSet {
Timestamp res = null;
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
res = this.rowData.getTimestamp(colIndex);
......@@ -454,7 +458,7 @@ public class TSDBResultSet implements ResultSet {
public Object getObject(int columnIndex) throws SQLException {
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
return this.rowData.get(colIndex);
} else {
......@@ -491,7 +495,7 @@ public class TSDBResultSet implements ResultSet {
public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
int colIndex = getTrueColumnIndex(columnIndex);
if (!this.blockwiseFetch) {
if (!this.getBatchFetch()) {
this.lastWasNull = this.rowData.wasNull(colIndex);
return new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
} else {
......
......@@ -56,13 +56,6 @@ public class TSDBResultSetBlockData {
if (this.numOfCols == 0) {
return;
}
this.colData = new ArrayList<Object>(numOfCols);
this.colData.addAll(Collections.nCopies(this.numOfCols, null));
}
public boolean wasNull(int col) {
return colData.get(col) == null;
}
public int getNumOfRows() {
......@@ -82,20 +75,19 @@ public class TSDBResultSetBlockData {
this.clear();
}
public void setColumnData(int col, byte val) {
this.colData.set(col, val);
}
public boolean hasMore() {
return this.rowIndex < this.numOfRows;
}
public boolean forward() {
this.rowIndex++;
return (this.rowIndex < this.numOfRows);
if (this.rowIndex > this.numOfRows) {
return false;
}
return ((++this.rowIndex) < this.numOfRows);
}
public void resetCursor() {
public void reset() {
this.rowIndex = 0;
}
......@@ -172,10 +164,58 @@ public class TSDBResultSetBlockData {
}
}
class NullType {
private static class NullType {
private static final byte NULL_BOOL_VAL = 0x2;
private static final String NULL_STR = "null";
public String toString() {
return new String("null");
return NullType.NULL_STR;
}
public static boolean isBooleanNull(byte val) {
return val == NullType.NULL_BOOL_VAL;
}
private static boolean isTinyIntNull(byte val) {
return val == Byte.MIN_VALUE;
}
private static boolean isSmallIntNull(short val) {
return val == Short.MIN_VALUE;
}
private static boolean isIntNull(int val) {
return val == Integer.MIN_VALUE;
}
private static boolean isBigIntNull(long val) {
return val == Long.MIN_VALUE;
}
private static boolean isFloatNull(float val) {
return Float.isNaN(val);
}
private static boolean isDoubleNull(double val) {
return Double.isNaN(val);
}
private static boolean isBinaryNull(byte[] val, int length) {
if (length != Byte.BYTES) {
return false;
}
return val[0] == 0xFF;
}
private static boolean isNcharNull(byte[] val, int length) {
if (length != Integer.BYTES) {
return false;
}
return (val[0] & val[1] & val[2] & val[3]) == 0xFF;
}
}
/**
......@@ -195,50 +235,6 @@ public class TSDBResultSetBlockData {
return obj.toString();
}
private boolean isBooleanNull(byte val) {
return val == 0x2;
}
private boolean isTinyIntNull(byte val) {
return val == 0x80;
}
private boolean isSmallIntNull(short val) {
return val == 0x8000;
}
private boolean isIntNull(int val) {
return val == 0x80000000L;
}
private boolean isBigIntNull(long val) {
return val == 0x8000000000000000L;
}
private boolean isFloatNull(float val) {
return Float.isNaN(val);
}
private boolean isDoubleNull(double val) {
return Double.isNaN(val);
}
private boolean isBinaryNull(byte[] val, int length) {
if (length != 1) {
return false;
}
return val[0] == 0xFF;
}
private boolean isNcharNull(byte[] val, int length) {
if (length != 4) {
return false;
}
return (val[0] & val[1] & val[2] & val[3]) == 0xFF ;
}
public int getInt(int col) {
Object obj = get(col);
if (obj == null) {
......@@ -284,16 +280,16 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_TINYINT:
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_INT: {
return ((int) obj == 0L)? Boolean.FALSE:Boolean.TRUE;
return ((int) obj == 0L) ? Boolean.FALSE : Boolean.TRUE;
}
case TSDBConstants.TSDB_DATA_TYPE_BIGINT:
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP: {
return (((Long) obj) == 0L)? Boolean.FALSE:Boolean.TRUE;
return (((Long) obj) == 0L) ? Boolean.FALSE : Boolean.TRUE;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT:
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
return (((Double) obj) == 0)? Boolean.FALSE:Boolean.TRUE;
return (((Double) obj) == 0) ? Boolean.FALSE : Boolean.TRUE;
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
......@@ -395,7 +391,7 @@ public class TSDBResultSetBlockData {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
byte val = bb.get(this.rowIndex);
if (isBooleanNull(val)) {
if (NullType.isBooleanNull(val)) {
return null;
}
......@@ -406,7 +402,7 @@ public class TSDBResultSetBlockData {
ByteBuffer bb = (ByteBuffer) this.colData.get(col);
byte val = bb.get(this.rowIndex);
if (isTinyIntNull(val)) {
if (NullType.isTinyIntNull(val)) {
return null;
}
......@@ -416,7 +412,7 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
ShortBuffer sb = (ShortBuffer) this.colData.get(col);
short val = sb.get(this.rowIndex);
if (isSmallIntNull(val)) {
if (NullType.isSmallIntNull(val)) {
return null;
}
......@@ -426,7 +422,7 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_INT: {
IntBuffer ib = (IntBuffer) this.colData.get(col);
int val = ib.get(this.rowIndex);
if (isIntNull(val)) {
if (NullType.isIntNull(val)) {
return null;
}
......@@ -437,7 +433,7 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
LongBuffer lb = (LongBuffer) this.colData.get(col);
long val = lb.get(this.rowIndex);
if (isBigIntNull(val)) {
if (NullType.isBigIntNull(val)) {
return null;
}
......@@ -447,7 +443,7 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
FloatBuffer fb = (FloatBuffer) this.colData.get(col);
float val = fb.get(this.rowIndex);
if (isFloatNull(val)) {
if (NullType.isFloatNull(val)) {
return null;
}
......@@ -457,7 +453,7 @@ public class TSDBResultSetBlockData {
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
DoubleBuffer lb = (DoubleBuffer) this.colData.get(col);
double val = lb.get(this.rowIndex);
if (isDoubleNull(val)) {
if (NullType.isDoubleNull(val)) {
return null;
}
......@@ -472,7 +468,7 @@ public class TSDBResultSetBlockData {
byte[] dest = new byte[length];
bb.get(dest, 0, length);
if (isBinaryNull(dest, length)) {
if (NullType.isBinaryNull(dest, length)) {
return null;
}
......@@ -487,7 +483,7 @@ public class TSDBResultSetBlockData {
byte[] dest = new byte[length];
bb.get(dest, 0, length);
if (isNcharNull(dest, length)) {
if (NullType.isNcharNull(dest, length)) {
return null;
}
......
......@@ -19,7 +19,7 @@ import java.util.ArrayList;
import java.util.List;
public class TSDBStatement implements Statement {
private TSDBJNIConnector connecter = null;
private TSDBJNIConnector connector = null;
/**
* To store batched commands
......@@ -45,9 +45,9 @@ public class TSDBStatement implements Statement {
this.connection = connection;
}
TSDBStatement(TSDBConnection connection, TSDBJNIConnector connecter) {
TSDBStatement(TSDBConnection connection, TSDBJNIConnector connector) {
this.connection = connection;
this.connecter = connecter;
this.connector = connector;
this.isClosed = false;
}
......@@ -65,27 +65,27 @@ public class TSDBStatement implements Statement {
}
// TODO make sure it is not a update query
pSql = this.connecter.executeQuery(sql);
pSql = this.connector.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
long resultSetPointer = this.connector.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
// create/insert/update/delete/alter
if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
return null;
}
if (!this.connecter.isUpdateQuery(pSql)) {
TSDBResultSet res = new TSDBResultSet(this.connecter, resultSetPointer);
res.setBlockWiseFetch(true);
if (!this.connector.isUpdateQuery(pSql)) {
TSDBResultSet res = new TSDBResultSet(this.connector, resultSetPointer);
res.setBatchFetch(this.connection.getBatchFetch());
return res;
} else {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
return null;
}
......@@ -97,28 +97,28 @@ public class TSDBStatement implements Statement {
}
// TODO check if current query is update query
pSql = this.connecter.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
pSql = this.connector.executeQuery(sql);
long resultSetPointer = this.connector.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
this.affectedRows = this.connecter.getAffectedRows(pSql);
this.connecter.freeResultSet(pSql);
this.affectedRows = this.connector.getAffectedRows(pSql);
this.connector.freeResultSet(pSql);
return this.affectedRows;
}
public String getErrorMsg(long pSql) {
return this.connecter.getErrMsg(pSql);
return this.connector.getErrMsg(pSql);
}
public void close() throws SQLException {
if (!isClosed) {
if (!this.connecter.isResultsetClosed()) {
this.connecter.freeResultSet();
if (!this.connector.isResultsetClosed()) {
this.connector.freeResultSet();
}
isClosed = true;
}
......@@ -174,15 +174,15 @@ public class TSDBStatement implements Statement {
throw new SQLException("Invalid method call on a closed statement.");
}
boolean res = true;
pSql = this.connecter.executeQuery(sql);
long resultSetPointer = this.connecter.getResultSet();
pSql = this.connector.executeQuery(sql);
long resultSetPointer = this.connector.getResultSet();
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
} else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
// no result set is retrieved
this.connecter.freeResultSet(pSql);
this.connector.freeResultSet(pSql);
res = false;
}
......@@ -193,10 +193,10 @@ public class TSDBStatement implements Statement {
if (isClosed) {
throw new SQLException("Invalid method call on a closed statement.");
}
long resultSetPointer = connecter.getResultSet();
long resultSetPointer = connector.getResultSet();
TSDBResultSet resSet = null;
if (resultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
resSet = new TSDBResultSet(connecter, resultSetPointer);
resSet = new TSDBResultSet(connector, resultSetPointer);
}
return resSet;
}
......@@ -269,7 +269,7 @@ public class TSDBStatement implements Statement {
}
public Connection getConnection() throws SQLException {
if (this.connecter != null)
if (this.connector != null)
return this.connection;
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
......
package com.taosdata.jdbc.rs;
import com.taosdata.jdbc.TSDBConstants;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
public class RestfulConnection implements Connection {
private final String host;
private final int port;
private final Properties props;
private final String database;
private final String url;
public RestfulConnection(String host, String port, Properties props, String database, String url) {
this.host = host;
this.port = Integer.parseInt(port);
this.props = props;
this.database = database;
this.url = url;
}
@Override
public Statement createStatement() throws SQLException {
if (isClosed())
throw new SQLException(TSDBConstants.WrapErrMsg("restful TDengine connection is closed."));
return new RestfulStatement(this, this.database);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
return null;
}
@Override
public String nativeSQL(String sql) throws SQLException {
return null;
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
}
@Override
public boolean getAutoCommit() throws SQLException {
return false;
}
@Override
public void commit() throws SQLException {
}
@Override
public void rollback() throws SQLException {
}
@Override
public void close() throws SQLException {
}
@Override
public boolean isClosed() throws SQLException {
return false;
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
//TODO: RestfulDatabaseMetaData is not implemented
return new RestfulDatabaseMetaData();
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
}
@Override
public boolean isReadOnly() throws SQLException {
return false;
}
@Override
public void setCatalog(String catalog) throws SQLException {
}
@Override
public String getCatalog() throws SQLException {
return null;
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
}
@Override
public int getTransactionIsolation() throws SQLException {
return 0;
}
@Override
public SQLWarning getWarnings() throws SQLException {
return null;
}
@Override
public void clearWarnings() throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
return null;
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
}
@Override
public void setHoldability(int holdability) throws SQLException {
}
@Override
public int getHoldability() throws SQLException {
return 0;
}
@Override
public Savepoint setSavepoint() throws SQLException {
return null;
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
return null;
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return null;
}
@Override
public Clob createClob() throws SQLException {
return null;
}
@Override
public Blob createBlob() throws SQLException {
return null;
}
@Override
public NClob createNClob() throws SQLException {
return null;
}
@Override
public SQLXML createSQLXML() throws SQLException {
return null;
}
@Override
public boolean isValid(int timeout) throws SQLException {
return false;
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
}
@Override
public String getClientInfo(String name) throws SQLException {
return null;
}
@Override
public Properties getClientInfo() throws SQLException {
return null;
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
return null;
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
return null;
}
@Override
public void setSchema(String schema) throws SQLException {
}
@Override
public String getSchema() throws SQLException {
return null;
}
@Override
public void abort(Executor executor) throws SQLException {
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
}
@Override
public int getNetworkTimeout() throws SQLException {
return 0;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public Properties getProps() {
return props;
}
public String getDatabase() {
return database;
}
public String getUrl() {
return url;
}
}
package com.taosdata.jdbc.rs;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.AbstractTaosDriver;
import com.taosdata.jdbc.TSDBConstants;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.rs.util.HttpClientPoolUtil;
import java.sql.*;
import java.util.Properties;
import java.util.logging.Logger;
public class RestfulDriver extends AbstractTaosDriver {
private static final String URL_PREFIX = "jdbc:TAOS-RS://";
static {
try {
DriverManager.registerDriver(new RestfulDriver());
} catch (SQLException e) {
throw new RuntimeException(TSDBConstants.WrapErrMsg("can not register Restful JDBC driver"), e);
}
}
@Override
public Connection connect(String url, Properties info) throws SQLException {
// throw SQLException if url is null
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is not set!"));
// return null if url is not be accepted
if (!acceptsURL(url))
return null;
Properties props = parseURL(url, info);
String host = props.getProperty(TSDBDriver.PROPERTY_KEY_HOST, "localhost");
String port = props.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "6041");
String database = props.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME);
String loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":"
+ props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/"
+ props.getProperty(TSDBDriver.PROPERTY_KEY_USER) + "/"
+ props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD) + "";
String result = HttpClientPoolUtil.execute(loginUrl);
JSONObject jsonResult = JSON.parseObject(result);
String status = jsonResult.getString("status");
if (!status.equals("succ")) {
throw new SQLException(jsonResult.getString("desc"));
}
return new RestfulConnection(host, port, props, database, url);
}
@Override
public boolean acceptsURL(String url) throws SQLException {
if (url == null)
throw new SQLException(TSDBConstants.WrapErrMsg("url is null"));
return (url != null && url.length() > 0 && url.trim().length() > 0) && url.startsWith(URL_PREFIX);
}
@Override
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
if (info == null) {
info = new Properties();
}
if (acceptsURL(url)) {
info = parseURL(url, info);
}
return getPropertyInfo(info);
}
@Override
public int getMajorVersion() {
return 2;
}
@Override
public int getMinorVersion() {
return 0;
}
@Override
public boolean jdbcCompliant() {
return false;
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
}
package com.taosdata.jdbc.rs;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
public class RestfulResultSetMetaData implements ResultSetMetaData {
private List<String> fields;
public RestfulResultSetMetaData(List<String> fields) {
this.fields = fields;
}
@Override
public int getColumnCount() throws SQLException {
return fields.size();
}
@Override
public boolean isAutoIncrement(int column) throws SQLException {
return false;
}
@Override
public boolean isCaseSensitive(int column) throws SQLException {
return false;
}
@Override
public boolean isSearchable(int column) throws SQLException {
return false;
}
@Override
public boolean isCurrency(int column) throws SQLException {
return false;
}
@Override
public int isNullable(int column) throws SQLException {
return 0;
}
@Override
public boolean isSigned(int column) throws SQLException {
return false;
}
@Override
public int getColumnDisplaySize(int column) throws SQLException {
return 0;
}
@Override
public String getColumnLabel(int column) throws SQLException {
return fields.get(column - 1);
}
@Override
public String getColumnName(int column) throws SQLException {
return null;
}
@Override
public String getSchemaName(int column) throws SQLException {
return null;
}
@Override
public int getPrecision(int column) throws SQLException {
return 0;
}
@Override
public int getScale(int column) throws SQLException {
return 0;
}
@Override
public String getTableName(int column) throws SQLException {
return null;
}
@Override
public String getCatalogName(int column) throws SQLException {
return null;
}
@Override
public int getColumnType(int column) throws SQLException {
return 0;
}
@Override
public String getColumnTypeName(int column) throws SQLException {
return null;
}
@Override
public boolean isReadOnly(int column) throws SQLException {
return false;
}
@Override
public boolean isWritable(int column) throws SQLException {
return false;
}
@Override
public boolean isDefinitelyWritable(int column) throws SQLException {
return false;
}
@Override
public String getColumnClassName(int column) throws SQLException {
return null;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
}
package com.taosdata.jdbc.rs;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBConstants;
import com.taosdata.jdbc.rs.util.HttpClientPoolUtil;
import java.sql.*;
import java.util.Arrays;
import java.util.List;
public class RestfulStatement implements Statement {
private final String catalog;
private final RestfulConnection conn;
public RestfulStatement(RestfulConnection c, String catalog) {
this.conn = c;
this.catalog = catalog;
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
final String url = "http://" + conn.getHost() + ":"+conn.getPort()+"/rest/sql";
String result = HttpClientPoolUtil.execute(url, sql);
String fields = "";
List<String> words = Arrays.asList(sql.split(" "));
if (words.get(0).equalsIgnoreCase("select")) {
int index = 0;
if (words.contains("from")) {
index = words.indexOf("from");
}
if (words.contains("FROM")) {
index = words.indexOf("FROM");
}
fields = HttpClientPoolUtil.execute(url, "DESCRIBE " + words.get(index + 1));
}
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject.getString("status").equals("error")) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " +
jsonObject.getString("desc") + "\n" +
"error code: " + jsonObject.getString("code")));
}
String dataStr = jsonObject.getString("data");
if ("use".equalsIgnoreCase(fields.split(" ")[0])) {
return new RestfulResultSet(dataStr, "");
}
JSONObject jsonField = JSON.parseObject(fields);
if (jsonField == null) {
return new RestfulResultSet(dataStr, "");
}
if (jsonField.getString("status").equals("error")) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " +
jsonField.getString("desc") + "\n" +
"error code: " + jsonField.getString("code")));
}
String fieldData = jsonField.getString("data");
return new RestfulResultSet(dataStr, fieldData);
}
@Override
public int executeUpdate(String sql) throws SQLException {
return 0;
}
@Override
public void close() throws SQLException {
}
@Override
public int getMaxFieldSize() throws SQLException {
return 0;
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
}
@Override
public int getMaxRows() throws SQLException {
return 0;
}
@Override
public void setMaxRows(int max) throws SQLException {
}
@Override
public void setEscapeProcessing(boolean enable) throws SQLException {
}
@Override
public int getQueryTimeout() throws SQLException {
return 0;
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
}
@Override
public void cancel() throws SQLException {
}
@Override
public SQLWarning getWarnings() throws SQLException {
return null;
}
@Override
public void clearWarnings() throws SQLException {
}
@Override
public void setCursorName(String name) throws SQLException {
}
@Override
public boolean execute(String sql) throws SQLException {
return false;
}
@Override
public ResultSet getResultSet() throws SQLException {
return null;
}
@Override
public int getUpdateCount() throws SQLException {
return 0;
}
@Override
public boolean getMoreResults() throws SQLException {
return false;
}
@Override
public void setFetchDirection(int direction) throws SQLException {
}
@Override
public int getFetchDirection() throws SQLException {
return 0;
}
@Override
public void setFetchSize(int rows) throws SQLException {
}
@Override
public int getFetchSize() throws SQLException {
return 0;
}
@Override
public int getResultSetConcurrency() throws SQLException {
return 0;
}
@Override
public int getResultSetType() throws SQLException {
return 0;
}
@Override
public void addBatch(String sql) throws SQLException {
}
@Override
public void clearBatch() throws SQLException {
}
@Override
public int[] executeBatch() throws SQLException {
return new int[0];
}
@Override
public Connection getConnection() throws SQLException {
return null;
}
@Override
public boolean getMoreResults(int current) throws SQLException {
return false;
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
return null;
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
return 0;
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
return 0;
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
return 0;
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
return false;
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
return false;
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
return false;
}
@Override
public int getResultSetHoldability() throws SQLException {
return 0;
}
@Override
public boolean isClosed() throws SQLException {
return false;
}
@Override
public void setPoolable(boolean poolable) throws SQLException {
}
@Override
public boolean isPoolable() throws SQLException {
return false;
}
@Override
public void closeOnCompletion() throws SQLException {
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
return false;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
}
package com.taosdata.jdbc.rs.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
public class HttpClientPoolUtil {
public static PoolingHttpClientConnectionManager cm = null;
public static CloseableHttpClient httpClient = null;
/**
* 默认content 类型
*/
private static final String DEFAULT_CONTENT_TYPE = "application/json";
/**
* 默认请求超时时间30s
*/
private static final int DEFAULT_TIME_OUT = 15000;
private static final int count = 32;
private static final int totalCount = 1000;
private static final int Http_Default_Keep_Time = 15000;
/**
* 初始化连接池
*/
public static synchronized void initPools() {
if (httpClient == null) {
cm = new PoolingHttpClientConnectionManager();
cm.setDefaultMaxPerRoute(count);
cm.setMaxTotal(totalCount);
httpClient = HttpClients.custom().setKeepAliveStrategy(defaultStrategy).setConnectionManager(cm).build();
}
}
/**
* Http connection keepAlive 设置
*/
public static ConnectionKeepAliveStrategy defaultStrategy = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
int keepTime = Http_Default_Keep_Time * 1000;
while (it.hasNext()) {
HeaderElement headerElement = it.nextElement();
String param = headerElement.getName();
String value = headerElement.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
try {
return Long.parseLong(value) * 1000;
} catch (Exception e) {
new Exception(
"format KeepAlive timeout exception, exception:" + e.toString())
.printStackTrace();
}
}
}
return keepTime;
};
public static CloseableHttpClient getHttpClient() {
return httpClient;
}
public static PoolingHttpClientConnectionManager getHttpConnectionManager() {
return cm;
}
/**
* 执行http post请求
* 默认采用Content-Type:application/json,Accept:application/json
*
* @param uri 请求地址
* @param data 请求数据
* @return responseBody
*/
public static String execute(String uri, String data) {
long startTime = System.currentTimeMillis();
HttpEntity httpEntity = null;
HttpEntityEnclosingRequestBase method = null;
String responseBody = "";
try {
if (httpClient == null) {
initPools();
}
method = (HttpEntityEnclosingRequestBase) getRequest(uri, HttpPost.METHOD_NAME, DEFAULT_CONTENT_TYPE, 0);
method.setEntity(new StringEntity(data));
HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity();
if (httpEntity != null) {
responseBody = EntityUtils.toString(httpEntity, "UTF-8");
}
} catch (Exception e) {
if (method != null) {
method.abort();
}
// e.printStackTrace();
// logger.error("execute post request exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception("execute post request exception, url:"
+ uri + ", exception:" + e.toString() +
", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
} finally {
if (httpEntity != null) {
try {
EntityUtils.consumeQuietly(httpEntity);
} catch (Exception e) {
// e.printStackTrace();
// logger.error("close response exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception(
"close response exception, url:" + uri +
", exception:" + e.toString()
+ ", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
}
}
}
return responseBody;
}
/**
* * 创建请求
*
* @param uri 请求url
* @param methodName 请求的方法类型
* @param contentType contentType类型
* @param timeout 超时时间
* @return HttpRequestBase 返回类型
* @author lisc
*/
public static HttpRequestBase getRequest(String uri, String methodName, String contentType, int timeout) {
if (httpClient == null) {
initPools();
}
HttpRequestBase method;
if (timeout <= 0) {
timeout = DEFAULT_TIME_OUT;
}
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(timeout * 1000)
.setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000)
.setExpectContinueEnabled(false).build();
if (HttpPut.METHOD_NAME.equalsIgnoreCase(methodName)) {
method = new HttpPut(uri);
} else if (HttpPost.METHOD_NAME.equalsIgnoreCase(methodName)) {
method = new HttpPost(uri);
} else if (HttpGet.METHOD_NAME.equalsIgnoreCase(methodName)) {
method = new HttpGet(uri);
} else {
method = new HttpPost(uri);
}
if (StringUtils.isBlank(contentType)) {
contentType = DEFAULT_CONTENT_TYPE;
}
method.addHeader("Content-Type", contentType);
method.addHeader("Accept", contentType);
method.setConfig(requestConfig);
return method;
}
/**
* 执行GET 请求
*
* @param uri 网址
* @return responseBody
*/
public static String execute(String uri) {
long startTime = System.currentTimeMillis();
HttpEntity httpEntity = null;
HttpRequestBase method = null;
String responseBody = "";
try {
if (httpClient == null) {
initPools();
}
method = getRequest(uri, HttpGet.METHOD_NAME, DEFAULT_CONTENT_TYPE, 0);
HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity();
if (httpEntity != null) {
responseBody = EntityUtils.toString(httpEntity, "UTF-8");
// logger.info("请求URL: " + uri + "+ 返回状态码:" + httpResponse.getStatusLine().getStatusCode());
}
} catch (Exception e) {
if (method != null) {
method.abort();
}
e.printStackTrace();
// logger.error("execute get request exception, url:" + uri + ", exception:" + e.toString() + ",cost time(ms):"
// + (System.currentTimeMillis() - startTime));
System.out.println("log:调用 HttpClientPoolUtil execute get request exception, url:" + uri + ", exception:" + e.toString() + ",cost time(ms):"
+ (System.currentTimeMillis() - startTime));
} finally {
if (httpEntity != null) {
try {
EntityUtils.consumeQuietly(httpEntity);
} catch (Exception e) {
// e.printStackTrace();
// logger.error("close response exception, url:" + uri + ", exception:" + e.toString()
// + ",cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception("close response exception, url:" + uri + ", exception:" + e.toString()
+ ",cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
}
}
}
return responseBody;
}
}
\ No newline at end of file
package com.taosdata.jdbc.rs;
import org.junit.Assert;
import org.junit.Test;
import java.sql.*;
public class RestfulDriverTest {
@Test
public void testCase001() {
try {
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
Connection connection = DriverManager.getConnection("jdbc:TAOS-RS://master:6041/?user=root&password=taosdata");
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from log.log");
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String column = metaData.getColumnLabel(i);
String value = resultSet.getString(i);
System.out.print(column + ":" + value + "\t");
}
System.out.println();
}
statement.close();
connection.close();
} catch (SQLException | ClassNotFoundException e) {
e.printStackTrace();
}
}
@Test
public void testAcceptUrl() throws SQLException {
Driver driver = new RestfulDriver();
boolean isAccept = driver.acceptsURL("jdbc:TAOS-RS://master:6041");
Assert.assertTrue(isAccept);
}
}
......@@ -24,7 +24,7 @@ int32_t dnodeInitVRead();
void dnodeCleanupVRead();
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg);
void * dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
void dnodeFreeVReadQueue(void *pRqueue);
#ifdef __cplusplus
}
......
......@@ -24,8 +24,8 @@ int32_t dnodeInitVWrite();
void dnodeCleanupVWrite();
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
void * dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
#ifdef __cplusplus
}
......
......@@ -151,6 +151,13 @@ void dnodeCleanupClient() {
}
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dDebug("msg:%p is ignored since dnode not running", pMsg);
rpcFreeCont(pMsg->pCont);
return;
}
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateEpSetForPeer(pEpSet);
}
......
......@@ -35,7 +35,7 @@ typedef struct {
pthread_mutex_t mutex;
} SVReadWorkerPool;
static void *dnodeProcessReadQueue(void *param);
static void *dnodeProcessReadQueue(void *pWorker);
// module global variable
static SVReadWorkerPool tsVReadWP;
......@@ -47,7 +47,7 @@ int32_t dnodeInitVRead() {
tsVReadWP.min = tsNumOfCores;
tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min;
tsVReadWP.worker = (SVReadWorker *)calloc(sizeof(SVReadWorker), tsVReadWP.max);
tsVReadWP.worker = calloc(sizeof(SVReadWorker), tsVReadWP.max);
pthread_mutex_init(&tsVReadWP.mutex, NULL);
if (tsVReadWP.worker == NULL) return -1;
......@@ -85,7 +85,7 @@ void dnodeCleanupVRead() {
void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char * pCont = (char *)pMsg->pCont;
char * pCont = pMsg->pCont;
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *)pCont;
......@@ -146,8 +146,8 @@ void *dnodeAllocVReadQueue(void *pVnode) {
return queue;
}
void dnodeFreeVReadQueue(void *rqueue) {
taosCloseQueue(rqueue);
void dnodeFreeVReadQueue(void *pRqueue) {
taosCloseQueue(pRqueue);
}
void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
......@@ -159,14 +159,12 @@ void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
};
rpcSendResponse(&rpcRsp);
vnodeRelease(pVnode);
}
void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
vnodeRelease(pVnode);
}
static void *dnodeProcessReadQueue(void *param) {
static void *dnodeProcessReadQueue(void *pWorker) {
SVReadMsg *pRead;
int32_t qtype;
void * pVnode;
......@@ -193,7 +191,7 @@ static void *dnodeProcessReadQueue(void *param) {
}
}
taosFreeQitem(pRead);
vnodeFreeFromRQueue(pVnode, pRead);
}
return NULL;
......
......@@ -38,11 +38,11 @@ typedef struct {
} SVWriteWorkerPool;
static SVWriteWorkerPool tsVWriteWP;
static void *dnodeProcessVWriteQueue(void *param);
static void *dnodeProcessVWriteQueue(void *pWorker);
int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
tsVWriteWP.worker = tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL);
......@@ -162,13 +162,13 @@ void *dnodeAllocVWriteQueue(void *pVnode) {
return queue;
}
void dnodeFreeVWriteQueue(void *wqueue) {
taosCloseQueue(wqueue);
void dnodeFreeVWriteQueue(void *pWqueue) {
taosCloseQueue(pWqueue);
}
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
if (param == NULL) return;
SVWriteMsg *pWrite = param;
void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (wparam == NULL) return;
SVWriteMsg *pWrite = wparam;
if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
......@@ -183,13 +183,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
};
rpcSendResponse(&rpcRsp);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
vnodeFreeFromWQueue(pVnode, pWrite);
}
static void *dnodeProcessVWriteQueue(void *param) {
SVWriteWorker *pWorker = param;
static void *dnodeProcessVWriteQueue(void *wparam) {
SVWriteWorker *pWorker = wparam;
SVWriteMsg * pWrite;
void * pVnode;
int32_t numOfMsgs;
......@@ -232,8 +230,7 @@ static void *dnodeProcessVWriteQueue(void *param) {
if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp);
}
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
vnodeFreeFromWQueue(pVnode, pWrite);
}
}
}
......
......@@ -54,10 +54,10 @@ void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
void dnodeFreeVReadQueue(void *pRqueue);
int32_t dnodeAllocateMPeerQueue();
void dnodeFreeMPeerQueue();
......
......@@ -201,6 +201,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing da
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
......
......@@ -46,7 +46,7 @@ extern "C" {
typedef struct {
void *appH;
void *cqH;
int (*notifyStatus)(void *, int status);
int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle);
......@@ -83,7 +83,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg);
int32_t tsdbDropRepo(char *rootDir);
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg);
int tsdbGetState(TSDB_REPO_T *repo);
......
......@@ -23,12 +23,12 @@ extern "C" {
#include "twal.h"
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_READY,
TAOS_VN_STATUS_CLOSING,
TAOS_VN_STATUS_UPDATING,
TAOS_VN_STATUS_RESET,
} EVnStatus;
TAOS_VN_STATUS_INIT = 0,
TAOS_VN_STATUS_READY = 1,
TAOS_VN_STATUS_CLOSING = 2,
TAOS_VN_STATUS_UPDATING = 3,
TAOS_VN_STATUS_RESET = 4,
} EVnodeStatus;
typedef struct {
int32_t len;
......@@ -70,17 +70,19 @@ void* vnodeAcquire(int32_t vgId); // add refcount
void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
void vnodeBuildStatusMsg(void *pStatus);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources();
void vnodeCleanupResources();
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
#ifdef __cplusplus
......
此差异已折叠。
......@@ -37,7 +37,7 @@ extern "C" {
#endif
#ifndef TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME -1
#define TAOS_EPOLL_WAIT_TIME 500
#endif
#ifdef TAOS_RANDOM_NETWORK_FAIL
......
......@@ -111,6 +111,9 @@ void taosUninitTimer() {
pthread_sigmask(SIG_BLOCK, &set, NULL);
*/
void taosMsleep(int mseconds) {
#if 1
usleep(mseconds * 1000);
#else
struct timeval timeout;
int seconds, useconds;
......@@ -126,7 +129,8 @@ void taosMsleep(int mseconds) {
select(0, NULL, NULL, NULL, &timeout);
/* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */
/* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */
#endif
}
#endif
\ No newline at end of file
......@@ -85,7 +85,7 @@ static void httpProcessHttpData(void *param) {
while (1) {
struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1);
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, TAOS_EPOLL_WAIT_TIME);
if (pThread->stop) {
httpDebug("%p, http thread get stop event, exiting...", pThread);
break;
......
......@@ -148,10 +148,12 @@ static void *monitorThreadFunc(void *param) {
}
if (tsMonitor.state == MON_STATE_NOT_INIT) {
int code = 0;
for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) {
monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int code = taos_errno(res);
code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
......@@ -162,7 +164,7 @@ static void *monitorThreadFunc(void *param) {
}
}
if (tsMonitor.start) {
if (tsMonitor.start && code == 0) {
tsMonitor.state = MON_STATE_INITED;
}
}
......
......@@ -43,7 +43,8 @@ typedef struct SHistogramInfo {
int32_t numOfElems;
int32_t numOfEntries;
int32_t maxEntries;
double min;
double max;
#if defined(USE_ARRAYLIST)
SHistBin* elems;
#else
......@@ -52,9 +53,6 @@ typedef struct SHistogramInfo {
int32_t maxIndex;
bool ordered;
#endif
double min;
double max;
} SHistogramInfo;
SHistogramInfo* tHistogramCreate(int32_t numOfBins);
......
......@@ -171,40 +171,17 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
}
static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true;
eventfd_t fd = -1;
// save thread into local variable since pThreadObj is freed when thread exits
// save thread into local variable and signal thread to stop
pthread_t thread = pThreadObj->thread;
if (taosComparePthread(pThreadObj->thread, pthread_self())) {
pthread_detach(pthread_self());
if (!taosCheckPthreadValid(thread)) {
return;
}
if (taosCheckPthreadValid(pThreadObj->thread)) {
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
fd = eventfd(1, 0);
if (fd == -1) {
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption:
tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno));
pThreadObj->stop = true;
pthread_cancel(pThreadObj->thread);
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption:
tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno));
pthread_cancel(pThreadObj->thread);
}
}
// at this step, pThreadObj has already been released
if (taosCheckPthreadValid(thread)) {
pthread_join(thread, NULL);
pThreadObj->stop = true;
if (taosComparePthread(thread, pthread_self())) {
pthread_detach(pthread_self());
return;
}
if (fd != -1) taosCloseSocket(fd);
pthread_join(thread, NULL);
}
void taosStopTcpServer(void *handle) {
......
......@@ -36,6 +36,7 @@ extern "C" {
#define TAOS_SMSG_STATUS 7
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
......@@ -105,7 +106,7 @@ typedef struct {
int8_t nacks;
int8_t confirmed;
int32_t code;
uint64_t time;
int64_t time;
} SFwdInfo;
typedef struct {
......
......@@ -179,6 +179,13 @@ int64_t syncStart(const SSyncInfo *pInfo) {
for (int32_t i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
if (pNode->peerInfo[i] == NULL) {
sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId, pNodeInfo->nodeFqdn,
pNodeInfo->nodePort);
syncStop(pNode->rid);
exit(1);
}
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
pNode->selfIndex = i;
}
......@@ -476,7 +483,11 @@ static void syncRemovePeer(SSyncPeer *pPeer) {
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL;
if (ip == 0xFFFFFFFF) {
sError("failed to add peer, can resolve fqdn:%s since %s", pInfo->nodeFqdn, strerror(errno));
terrno = TSDB_CODE_RPC_FQDN_ERROR;
return NULL;
}
SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL;
......@@ -1193,14 +1204,17 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
if (pSyncFwds) {;
uint64_t time = taosGetTimestampMs();
if (pSyncFwds) {
int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex));
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break;
if (ABS(time - pFwdInfo->time) < 2000) break;
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
pFwdInfo->version, time, pFwdInfo->time);
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
......
......@@ -136,7 +136,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
int32_t ret, code = -1;
void *buffer = calloc(1024000, 1); // size for one record
void *buffer = calloc(SYNC_MAX_SIZE, 1); // size for one record
if (buffer == NULL) return -1;
SWalHead *pHead = (SWalHead *)buffer;
......@@ -237,7 +237,7 @@ static int32_t syncOpenRecvBuffer(SSyncNode *pNode) {
SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
if (pRecv == NULL) return -1;
pRecv->bufferSize = 5000000;
pRecv->bufferSize = SYNC_RECV_BUFFER_SIZE;
pRecv->buffer = malloc(pRecv->bufferSize);
if (pRecv->buffer == NULL) {
free(pRecv);
......
......@@ -301,31 +301,14 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
}
static void taosStopPoolThread(SThreadObj *pThread) {
pthread_t thread = pThread->thread;
if (!taosCheckPthreadValid(thread)) {
return;
}
pThread->stop = true;
if (pThread->thread == pthread_self()) {
if (taosComparePthread(thread, pthread_self())) {
pthread_detach(pthread_self());
return;
}
// save thread ID into a local variable, since pThread is freed when the thread exits
pthread_t thread = pThread->thread;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = {.events = EPOLLIN};
eventfd_t fd = eventfd(1, 0);
if (fd == -1) {
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption
sError("failed to create eventfd since %s", strerror(errno));
pthread_cancel(pThread->thread);
pThread->stop = true;
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption
sError("failed to call epoll_ctl since %s", strerror(errno));
pthread_cancel(pThread->thread);
}
pthread_join(thread, NULL);
if (fd >= 0) taosClose(fd);
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册