提交 f0b93abb 编写于 作者: H hjxilinx

[tbase-874]

上级 3d591668
......@@ -34,8 +34,8 @@ extern "C" {
#include "tglobalcfg.h"
#include "tlog.h"
#include "tscCache.h"
#include "tsdb.h"
#include "tscSQLParser.h"
#include "tsdb.h"
#include "tsqlfunction.h"
#include "tutil.h"
......@@ -219,22 +219,22 @@ typedef struct STagCond {
} STagCond;
typedef struct SParamInfo {
int32_t idx;
char type;
uint8_t timePrec;
short bytes;
int32_t idx;
char type;
uint8_t timePrec;
short bytes;
uint32_t offset;
} SParamInfo;
typedef struct STableDataBlocks {
char meterId[TSDB_METER_ID_LEN];
int8_t tsSource;
bool ordered;
bool ordered;
int64_t vgid;
int64_t prevTS;
int32_t numOfMeters;
int32_t numOfMeters;
int32_t rowSize;
uint32_t nAllocSize;
......@@ -245,9 +245,9 @@ typedef struct STableDataBlocks {
};
// for parameter ('?') binding
uint32_t numOfAllocedParams;
uint32_t numOfParams;
SParamInfo* params;
uint32_t numOfAllocedParams;
uint32_t numOfParams;
SParamInfo *params;
} STableDataBlocks;
typedef struct SDataBlockList {
......@@ -262,18 +262,17 @@ typedef struct SDataBlockList {
typedef struct {
SOrderVal order;
int command;
// TODO refactor
int count;
int16_t isInsertFromFile; // load data from file or not
int count;// TODO refactor
union {
bool existsCheck;
int8_t showType;
bool existsCheck; // check if the table exists
int8_t showType; // show command type
int8_t isInsertFromFile; // load data from file or not
};
bool import; // import/insert type
char msgType;
uint16_t type;
uint16_t type; // query type
char intervalTimeUnit;
int64_t etime, stime;
int64_t nAggTimeInterval; // aggregation time interval
......@@ -286,20 +285,20 @@ typedef struct {
*
* In such cases, allocate the memory dynamically, and need to free the memory
*/
uint32_t allocSize;
char * payload;
int payloadLen;
short numOfCols;
uint32_t allocSize;
char * payload;
int payloadLen;
short numOfCols;
SColumnBaseInfo colList;
SFieldInfo fieldsInfo;
SSqlExprInfo exprsInfo;
SLimitVal limit;
SLimitVal slimit;
int64_t globalLimit;
STagCond tagCond;
int16_t vnodeIdx; // vnode index in pMetricMeta for metric query
int16_t interpoType; // interpolate type
int16_t numOfTables;
SFieldInfo fieldsInfo;
SSqlExprInfo exprsInfo;
SLimitVal limit;
SLimitVal slimit;
int64_t globalLimit;
STagCond tagCond;
int16_t vnodeIdx; // vnode index in pMetricMeta for metric query
int16_t interpoType; // interpolate type
int16_t numOfTables;
// submit data blocks branched according to vnode
SDataBlockList * pDataBlocks;
......@@ -430,11 +429,11 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion);
void tscInitMsgs();
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle);
int tscProcessSql(SSqlObj *pSql);
int tscProcessSql(SSqlObj *pSql);
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
int tscRenewMeterMeta(SSqlObj *pSql, char *meterId);
int tscRenewMeterMeta(SSqlObj *pSql, char *meterId);
void tscQueueAsyncRes(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param);
......@@ -448,18 +447,12 @@ int taos_retrieve(TAOS_RES *res);
* before send query message to vnode
*/
int32_t tscTansformSQLFunctionForMetricQuery(SSqlCmd *pCmd);
void tscRestoreSQLFunctionForMetricQuery(SSqlCmd *pCmd);
/**
* release both metric/meter meta information
* @param pCmd SSqlCmd object that contains the metric/meter meta info
*/
void tscClearSqlMetaInfo(SSqlCmd *pCmd);
void tscRestoreSQLFunctionForMetricQuery(SSqlCmd *pCmd);
void tscClearSqlMetaInfoForce(SSqlCmd *pCmd);
int32_t tscCreateResPointerInfo(SSqlCmd *pCmd, SSqlRes *pRes);
void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscFreeSqlCmdData(SSqlCmd *pCmd);
......@@ -479,12 +472,12 @@ void tscFreeSqlObj(SSqlObj *pObj);
void tscCloseTscObj(STscObj *pObj);
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
int32_t tscBuildResultsForEmptyRetrieval(SSqlObj *pSql);
bool tscIsUpdateQuery(STscObj *pObj);
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
// transfer SSqlInfo to SqlCmd struct
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
......
......@@ -40,6 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
*/
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
// TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
......@@ -54,18 +55,17 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
tscError("sql string too long");
tscQueueAsyncError(fp, param);
return;
}
}
taosNotePrintTsc(sqlstr);
SSqlObj *pSql = (SSqlObj *)malloc(sizeof(SSqlObj));
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param);
return;
}
memset(pSql, 0, sizeof(SSqlObj));
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......
此差异已折叠。
此差异已折叠。
......@@ -123,6 +123,7 @@ bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) {
return memcmp(p1, p2, size) == 0;
}
//todo refactor
static FORCE_INLINE char* skipSegments(char* input, char delimiter, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
while (*input != 0 && *input++ != delimiter) {
......
......@@ -1415,7 +1415,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql) {
pMsg = pStart;
pShellMsg = (SShellSubmitMsg *)pMsg;
pShellMsg->import = pSql->cmd.order.order;
pShellMsg->import = pSql->cmd.import;
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pShellMsg->numOfSid = htonl(pSql->cmd.count); // number of meters to be inserted
......@@ -3453,31 +3453,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
return 0;
}
static void doDecompressPayload(SSqlCmd *pCmd, SSqlRes *pRes, int16_t compressed) {
if (compressed && pRes->numOfRows > 0) {
SRetrieveMeterRsp *pRetrieve = (SRetrieveMeterRsp *)pRes->pRsp;
int32_t numOfTotalCols = pCmd->fieldsInfo.numOfOutputCols + pCmd->fieldsInfo.numOfHiddenCols;
int32_t rowSize = pCmd->fieldsInfo.pOffset[numOfTotalCols - 1] + pCmd->fieldsInfo.pFields[numOfTotalCols - 1].bytes;
// TODO handle the OOM problem
char * buf = malloc(rowSize * pRes->numOfRows);
int32_t payloadSize = pRes->rspLen - 1 - sizeof(SRetrieveMeterRsp);
assert(payloadSize > 0);
int32_t decompressedSize = tsDecompressString(pRetrieve->data, payloadSize, 1, buf, rowSize * pRes->numOfRows, 0, 0, 0);
assert(decompressedSize == rowSize * pRes->numOfRows);
pRes->pRsp = realloc(pRes->pRsp, pRes->rspLen - payloadSize + decompressedSize);
memcpy(pRes->pRsp + sizeof(SRetrieveMeterRsp), buf, decompressedSize);
free(buf);
}
pRes->data = ((SRetrieveMeterRsp *)pRes->pRsp)->data;
}
int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
......@@ -3490,9 +3465,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes->offset = htobe64(pRetrieve->offset);
pRes->useconds = htobe64(pRetrieve->useconds);
pRetrieve->compress = htons(pRetrieve->compress);
doDecompressPayload(pCmd, pRes, pRetrieve->compress);
pRes->data = pRetrieve->data;
tscSetResultPointer(pCmd, pRes);
pRes->row = 0;
......
......@@ -246,7 +246,12 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
tscDoQuery(pSql);
}
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
} else {
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
}
if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql);
}
......@@ -266,8 +271,9 @@ int taos_query(TAOS *taos, const char *sqlstr) {
size_t sqlLen = strlen(sqlstr);
if (sqlLen > TSDB_MAX_SQL_LEN) {
tscError("%p sql too long", pSql);
pRes->code = TSDB_CODE_INVALID_SQL;
pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
return pRes->code;
}
......@@ -276,8 +282,9 @@ int taos_query(TAOS *taos, const char *sqlstr) {
void *sql = realloc(pSql->sqlstr, sqlLen + 1);
if (sql == NULL) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql);
tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno));
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
return pRes->code;
}
......@@ -777,9 +784,9 @@ int taos_errno(TAOS *taos) {
}
char *taos_errstr(TAOS *taos) {
STscObj * pObj = (STscObj *)taos;
unsigned char code;
char temp[256] = {0};
STscObj *pObj = (STscObj *)taos;
uint8_t code;
// char temp[256] = {0};
if (pObj == NULL || pObj->signature != pObj) return tsError[globalCode];
......@@ -788,9 +795,10 @@ char *taos_errstr(TAOS *taos) {
else
code = pObj->pSql->res.code;
// for invalid sql, additional information is attached to explain why the sql is invalid
if (code == TSDB_CODE_INVALID_SQL) {
snprintf(temp, tListLen(temp), "invalid SQL: %s", pObj->pSql->cmd.payload);
strcpy(pObj->pSql->cmd.payload, temp);
// snprintf(temp, tListLen(temp), "invalid SQL: %s", pObj->pSql->cmd.payload);
// strcpy(pObj->pSql->cmd.payload, temp);
return pObj->pSql->cmd.payload;
} else {
return tsError[code];
......
......@@ -1294,8 +1294,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
// re-build the whole name string
if (pStr[firstPartLen] == TS_PATH_DELIMITER[0]) {
// first part do not have quote
// do nothing
// first part do not have quote do nothing
} else {
pStr[firstPartLen] = TS_PATH_DELIMITER[0];
memmove(&pStr[firstPartLen + 1], pToken->z, pToken->n);
......@@ -1842,5 +1841,30 @@ bool tscIsUpdateQuery(STscObj* pObj) {
SSqlCmd* pCmd = &pObj->pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) ||
TSDB_SQL_USE_DB == pCmd->command) ? 1 : 0;
}
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql) {
const char *msgFormat1 = "invalid SQL: %s";
const char *msgFormat2 = "invalid SQL: syntax error near \"%s\" (%s)";
const char *msgFormat3 = "invalid SQL: syntax error near \"%s\"";
const int32_t BACKWARD_CHAR_STEP = 0;
if (sql == NULL) {
assert(additionalInfo != NULL);
sprintf(msg, msgFormat1, additionalInfo);
return TSDB_CODE_INVALID_SQL;
}
char buf[64] = {0}; // only extract part of sql string
strncpy(buf, (sql - BACKWARD_CHAR_STEP), tListLen(buf) - 1);
if (additionalInfo != NULL) {
sprintf(msg, msgFormat2, buf, additionalInfo);
} else {
sprintf(msg, msgFormat3, buf); // no additional information for invalid sql error
}
return TSDB_CODE_INVALID_SQL;
}
......@@ -568,7 +568,6 @@ typedef struct {
typedef struct {
int32_t numOfRows;
int16_t precision;
int16_t compress;
int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds;
char data[];
......
......@@ -353,7 +353,7 @@ bool vnodeIsValidVnodeCfg(SVnodeCfg *pCfg);
int32_t vnodeGetResultSize(void *handle, int32_t *numOfRows);
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t *size);
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows);
int64_t vnodeGetOffsetVal(void *thandle);
......
......@@ -6943,36 +6943,18 @@ static int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **p
return numOfRes;
}
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int32_t *size) {
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
SMeterObj *pObj = pQInfo->pObj;
SQuery * pQuery = &pQInfo->query;
int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock;
int32_t dataSize = pQInfo->query.rowSize * numOfRows;
if (dataSize >= tsCompressMsgSize && tsCompressMsgSize > 0) {
char *compBuf = malloc((size_t)dataSize);
// for metric query, bufIndex always be 0.
char *d = compBuf;
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
int32_t bytes = pQuery->pSelectExpr[col].resBytes;
memmove(d, pQuery->sdata[col]->data + bytes * tnumOfRows * pQInfo->bufIndex, bytes * numOfRows);
d += bytes * numOfRows;
}
*size = tsCompressString(compBuf, dataSize, 1, data, dataSize + EXTRA_BYTES, 0, 0, 0);
dTrace("QInfo:%p compress rsp msg, before:%d, after:%d", pQInfo, dataSize, *size);
free(compBuf);
} else { // for metric query, bufIndex always be 0.
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
int32_t bytes = pQuery->pSelectExpr[col].resBytes;
int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock;
// for metric query, bufIndex always be 0.
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0
int32_t bytes = pQuery->pSelectExpr[col].resBytes;
memmove(data, pQuery->sdata[col]->data + bytes * tnumOfRows * pQInfo->bufIndex, bytes * numOfRows);
data += bytes * numOfRows;
}
memmove(data, pQuery->sdata[col]->data + bytes * tnumOfRows * pQInfo->bufIndex, bytes * numOfRows);
data += bytes * numOfRows;
}
}
......@@ -6987,7 +6969,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
* @param numOfRows the number of rows that are not returned in current retrieve
* @return
*/
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, int32_t *size) {
int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows) {
SQInfo *pQInfo = (SQInfo *)handle;
SQuery *pQuery = &pQInfo->query;
......@@ -7000,7 +6982,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i
// make sure file exist
if (VALIDFD(fd)) {
size_t s = lseek(fd, 0, SEEK_END);
dTrace("QInfo:%p ts comp data return, file:%s, size:%ld", pQInfo, pQuery->sdata[0]->data, size);
dTrace("QInfo:%p ts comp data return, file:%s, size:%lld", pQInfo, pQuery->sdata[0]->data, s);
lseek(fd, 0, SEEK_SET);
read(fd, data, s);
......@@ -7012,7 +6994,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows, i
pQuery->sdata[0]->data, strerror(errno));
}
} else {
doCopyQueryResultToMsg(pQInfo, numOfRows, data, size);
doCopyQueryResultToMsg(pQInfo, numOfRows, data);
}
return numOfRows;
......
......@@ -850,7 +850,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
// the remained number of retrieved rows, not the interpolated result
int numOfRows = pQInfo->pointsRead - pQInfo->pointsReturned;
int32_t numOfFinal = vnodeCopyQueryResultToMsg(pQInfo, data, numOfRows, size);
int32_t numOfFinal = vnodeCopyQueryResultToMsg(pQInfo, data, numOfRows);
pQInfo->pointsReturned += numOfFinal;
dTrace("QInfo:%p %d are returned, totalReturned:%d totalRead:%d", pQInfo, numOfFinal, pQInfo->pointsReturned,
......@@ -862,12 +862,9 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo);
/*
* If SQInfo has been released, the value of signature cannot be equalled to
* the address of pQInfo, since in release function, the original value has
* been
* destroyed. However, this memory area may be reused by another function.
* It may be 0 or any value, but it is rarely still be equalled to the address
* of SQInfo.
* If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo,
* since in release function, the original value has been destroyed. However, this memory area may be reused
* by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo.
*/
if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) {
dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature);
......
......@@ -452,11 +452,7 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
pMsg = pRsp->data;
if (numOfRows > 0 && code == TSDB_CODE_SUCCESS) {
int32_t oldSize = size;
vnodeSaveQueryResult((void *)(pRetrieve->qhandle), pRsp->data, &size);
if (oldSize > size) {
pRsp->compress = htons(1); // denote that the response msg is compressed
}
}
pMsg += size;
......@@ -573,6 +569,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int sversion = htonl(pBlocks->sversion);
if (pSubmit->import) {
dTrace("start to import data");
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册