Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
580448e3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
580448e3
编写于
8月 31, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-621]
上级
f3c92ad3
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
134 addition
and
83 deletion
+134
-83
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+2
-2
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+11
-25
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+45
-20
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+2
-5
src/client/src/tscServer.c
src/client/src/tscServer.c
+5
-11
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+44
-6
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+25
-14
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
580448e3
...
...
@@ -186,7 +186,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
size_t
tscSqlExprNumOfExprs
(
SQueryInfo
*
pQueryInfo
);
SSqlExpr
*
tscSqlExprGet
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
);
void
tscSqlExprCopy
(
SArray
*
dst
,
const
SArray
*
src
,
uint64_t
uid
,
bool
deepcopy
);
int32_t
tscSqlExprCopy
(
SArray
*
dst
,
const
SArray
*
src
,
uint64_t
uid
,
bool
deepcopy
);
void
tscSqlExprInfoDestroy
(
SArray
*
pExprInfo
);
SColumn
*
tscColumnClone
(
const
SColumn
*
src
);
...
...
@@ -204,7 +204,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
SCond
*
tsGetSTableQueryCond
(
STagCond
*
pCond
,
uint64_t
uid
);
void
tsSetSTableQueryCond
(
STagCond
*
pTagCond
,
uint64_t
uid
,
SBufferWriter
*
bw
);
void
tscTagCondCopy
(
STagCond
*
dest
,
const
STagCond
*
src
);
int32_t
tscTagCondCopy
(
STagCond
*
dest
,
const
STagCond
*
src
);
void
tscTagCondRelease
(
STagCond
*
pCond
);
void
tscGetSrcColumnInfo
(
SSrcColumnInfo
*
pColInfo
,
SQueryInfo
*
pQueryInfo
);
...
...
src/client/src/tscAsync.c
浏览文件 @
580448e3
...
...
@@ -50,7 +50,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql
->
sqlstr
=
calloc
(
1
,
sqlLen
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"%p failed to malloc sql string buffer"
,
pSql
);
tscQueueAsyncError
(
pSql
->
fp
,
pSql
->
param
,
TSDB_CODE_TSC_OUT_OF_MEMORY
);
pSql
->
res
.
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscQueueAsyncRes
(
pSql
);
return
;
}
...
...
@@ -94,7 +95,6 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
SSqlObj
*
pSql
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pSql
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_TSC_OUT_OF_MEMORY
);
return
;
}
...
...
@@ -191,7 +191,7 @@ void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRo
tscProcessAsyncRetrieveImpl
(
param
,
tres
,
numOfRows
,
tscAsyncFetchSingleRowProxy
);
}
void
taos_fetch_rows_a
(
TAOS_RES
*
taosa
,
void
(
*
fp
)(
void
*
,
TAOS_RES
*
,
int
)
,
void
*
param
)
{
void
taos_fetch_rows_a
(
TAOS_RES
*
taosa
,
__async_cb_func_t
fp
,
void
*
param
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
taosa
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
tscError
(
"sql object is NULL"
);
...
...
@@ -209,6 +209,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if
(
pRes
->
qhandle
==
0
)
{
tscError
(
"qhandle is NULL"
);
pRes
->
code
=
TSDB_CODE_TSC_INVALID_QHANDLE
;
pSql
->
param
=
param
;
tscQueueAsyncRes
(
pSql
);
return
;
}
...
...
@@ -269,7 +271,10 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
if
(
pRes
->
qhandle
==
0
)
{
tscError
(
"qhandle is NULL"
);
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_TSC_INVALID_QHANDLE
);
pSql
->
param
=
param
;
pRes
->
code
=
TSDB_CODE_TSC_INVALID_QHANDLE
;
tscQueueAsyncRes
(
pSql
);
return
;
}
...
...
@@ -352,36 +357,17 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
void
tscProcessAsyncRes
(
SSchedMsg
*
pMsg
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
pMsg
->
ahandle
;
// SSqlCmd *pCmd = &pSql->cmd;
SSqlRes
*
pRes
=
&
pSql
->
res
;
// void *taosres = pSql;
// pCmd may be released, so cache pCmd->command
// int cmd = pCmd->command;
// int code = pRes->code;
// in case of async insert, restore the user specified callback function
// bool shouldFree = tscShouldBeFreed(pSql);
// if (pCmd->command == TSDB_SQL_INSERT) {
// assert(pSql->fp != NULL);
assert
(
pSql
->
fp
!=
NULL
&&
pSql
->
fetchFp
!=
NULL
);
// }
// if (pSql->fp) {
pSql
->
fp
=
pSql
->
fetchFp
;
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
pRes
->
code
);
// }
// if (shouldFree) {
// tscDebug("%p sqlObj is automatically freed in async res", pSql);
// tscFreeSqlObj(pSql);
// }
}
// this function will be executed by queue task threads, so the terrno is not valid
static
void
tscProcessAsyncError
(
SSchedMsg
*
pMsg
)
{
void
(
*
fp
)()
=
pMsg
->
ahandle
;
terrno
=
*
(
int32_t
*
)
pMsg
->
msg
;
(
*
fp
)(
pMsg
->
thandle
,
NULL
,
*
(
int32_t
*
)
pMsg
->
msg
);
}
...
...
src/client/src/tscLocal.c
浏览文件 @
580448e3
...
...
@@ -274,7 +274,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
return
tscSetValueToResObj
(
pSql
,
rowLen
);
}
static
void
tscProcessCurrentUser
(
SSqlObj
*
pSql
)
{
static
int32_t
tscProcessCurrentUser
(
SSqlObj
*
pSql
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
...
...
@@ -282,14 +282,20 @@ static void tscProcessCurrentUser(SSqlObj *pSql) {
pExpr
->
resType
=
TSDB_DATA_TYPE_BINARY
;
char
*
vx
=
calloc
(
1
,
pExpr
->
resBytes
);
if
(
vx
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
size_t
size
=
sizeof
(
pSql
->
pTscObj
->
user
);
STR_WITH_MAXSIZE_TO_VARSTR
(
vx
,
pSql
->
pTscObj
->
user
,
size
);
tscSetLocalQueryResult
(
pSql
,
vx
,
pExpr
->
aliasName
,
pExpr
->
resType
,
pExpr
->
resBytes
);
free
(
vx
);
return
TSDB_CODE_SUCCESS
;
}
static
void
tscProcessCurrentDB
(
SSqlObj
*
pSql
)
{
static
int32_t
tscProcessCurrentDB
(
SSqlObj
*
pSql
)
{
char
db
[
TSDB_DB_NAME_LEN
]
=
{
0
};
extractDBName
(
pSql
->
pTscObj
->
db
,
db
);
...
...
@@ -302,6 +308,10 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
pExpr
->
resBytes
=
TSDB_DB_NAME_LEN
+
VARSTR_HEADER_SIZE
;
char
*
vx
=
calloc
(
1
,
pExpr
->
resBytes
);
if
(
vx
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
if
(
t
==
0
)
{
setVardataNull
(
vx
,
TSDB_DATA_TYPE_BINARY
);
}
else
{
...
...
@@ -310,9 +320,11 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
tscSetLocalQueryResult
(
pSql
,
vx
,
pExpr
->
aliasName
,
pExpr
->
resType
,
pExpr
->
resBytes
);
free
(
vx
);
return
TSDB_CODE_SUCCESS
;
}
static
void
tscProcessServerVer
(
SSqlObj
*
pSql
)
{
static
int32_t
tscProcessServerVer
(
SSqlObj
*
pSql
)
{
const
char
*
v
=
pSql
->
pTscObj
->
sversion
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
...
...
@@ -323,13 +335,18 @@ static void tscProcessServerVer(SSqlObj *pSql) {
pExpr
->
resBytes
=
(
int16_t
)(
t
+
VARSTR_HEADER_SIZE
);
char
*
vx
=
calloc
(
1
,
pExpr
->
resBytes
);
if
(
vx
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
STR_WITH_SIZE_TO_VARSTR
(
vx
,
v
,
(
VarDataLenT
)
t
);
tscSetLocalQueryResult
(
pSql
,
vx
,
pExpr
->
aliasName
,
pExpr
->
resType
,
pExpr
->
resBytes
);
taosTFree
(
vx
);
free
(
vx
);
return
TSDB_CODE_SUCCESS
;
}
static
void
tscProcessClientVer
(
SSqlObj
*
pSql
)
{
static
int32_t
tscProcessClientVer
(
SSqlObj
*
pSql
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
...
...
@@ -339,23 +356,28 @@ static void tscProcessClientVer(SSqlObj *pSql) {
pExpr
->
resBytes
=
(
int16_t
)(
t
+
VARSTR_HEADER_SIZE
);
char
*
v
=
calloc
(
1
,
pExpr
->
resBytes
);
if
(
v
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
STR_WITH_SIZE_TO_VARSTR
(
v
,
version
,
(
VarDataLenT
)
t
);
tscSetLocalQueryResult
(
pSql
,
v
,
pExpr
->
aliasName
,
pExpr
->
resType
,
pExpr
->
resBytes
);
taosTFree
(
v
);
free
(
v
);
return
TSDB_CODE_SUCCESS
;
}
static
void
tscProcessServStatus
(
SSqlObj
*
pSql
)
{
static
int32_t
tscProcessServStatus
(
SSqlObj
*
pSql
)
{
STscObj
*
pObj
=
pSql
->
pTscObj
;
if
(
pObj
->
pHb
!=
NULL
)
{
if
(
pObj
->
pHb
->
res
.
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
pSql
->
res
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
return
;
return
pSql
->
res
.
code
;
}
}
else
{
if
(
pSql
->
res
.
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
return
;
return
pSql
->
res
.
code
;
}
}
...
...
@@ -364,6 +386,7 @@ static void tscProcessServStatus(SSqlObj *pSql) {
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
int32_t
val
=
1
;
tscSetLocalQueryResult
(
pSql
,
(
char
*
)
&
val
,
pExpr
->
aliasName
,
TSDB_DATA_TYPE_INT
,
sizeof
(
int32_t
));
return
TSDB_CODE_SUCCESS
;
}
void
tscSetLocalQueryResult
(
SSqlObj
*
pSql
,
const
char
*
val
,
const
char
*
columnName
,
int16_t
type
,
size_t
valueLength
)
{
...
...
@@ -393,37 +416,39 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
int
tscProcessLocalCmd
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pCmd
->
command
==
TSDB_SQL_CFG_LOCAL
)
{
p
Sql
->
res
.
code
=
(
uint8_t
)
taosCfgDynamicOptions
(
pCmd
->
payload
);
p
Res
->
code
=
(
uint8_t
)
taosCfgDynamicOptions
(
pCmd
->
payload
);
}
else
if
(
pCmd
->
command
==
TSDB_SQL_DESCRIBE_TABLE
)
{
p
Sql
->
res
.
code
=
(
uint8_t
)
tscProcessDescribeTable
(
pSql
);
p
Res
->
code
=
(
uint8_t
)
tscProcessDescribeTable
(
pSql
);
}
else
if
(
pCmd
->
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
)
{
/*
* set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to
* free allocated resources and remove the SqlObj from sql query linked list
*/
p
Sql
->
res
.
qhandle
=
0x1
;
p
Sql
->
res
.
numOfRows
=
0
;
p
Res
->
qhandle
=
0x1
;
p
Res
->
numOfRows
=
0
;
}
else
if
(
pCmd
->
command
==
TSDB_SQL_RESET_CACHE
)
{
taosCacheEmpty
(
tscCacheHandle
);
pRes
->
code
=
TSDB_CODE_SUCCESS
;
}
else
if
(
pCmd
->
command
==
TSDB_SQL_SERV_VERSION
)
{
tscProcessServerVer
(
pSql
);
pRes
->
code
=
tscProcessServerVer
(
pSql
);
}
else
if
(
pCmd
->
command
==
TSDB_SQL_CLI_VERSION
)
{
tscProcessClientVer
(
pSql
);
pRes
->
code
=
tscProcessClientVer
(
pSql
);
}
else
if
(
pCmd
->
command
==
TSDB_SQL_CURRENT_USER
)
{
tscProcessCurrentUser
(
pSql
);
pRes
->
code
=
tscProcessCurrentUser
(
pSql
);
}
else
if
(
pCmd
->
command
==
TSDB_SQL_CURRENT_DB
)
{
tscProcessCurrentDB
(
pSql
);
pRes
->
code
=
tscProcessCurrentDB
(
pSql
);
}
else
if
(
pCmd
->
command
==
TSDB_SQL_SERV_STATUS
)
{
tscProcessServStatus
(
pSql
);
pRes
->
code
=
tscProcessServStatus
(
pSql
);
}
else
{
p
Sql
->
res
.
code
=
TSDB_CODE_TSC_INVALID_SQL
;
p
Res
->
code
=
TSDB_CODE_TSC_INVALID_SQL
;
tscError
(
"%p not support command:%d"
,
pSql
,
pCmd
->
command
);
}
// keep the code in local variable in order to avoid invalid read in case of async query
int32_t
code
=
p
Sql
->
res
.
code
;
int32_t
code
=
p
Res
->
code
;
if
(
code
==
TSDB_CODE_SUCCESS
)
{
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
code
);
}
else
{
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
580448e3
...
...
@@ -67,8 +67,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
SQLFunctionCtx
*
pCtx
=
&
pReducer
->
pCtx
[
i
];
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
pCtx
->
aOutputBuf
=
pReducer
->
pResultBuf
->
data
+
pExpr
->
offset
*
pReducer
->
resColModel
->
capacity
;
pCtx
->
aOutputBuf
=
pReducer
->
pResultBuf
->
data
+
pExpr
->
offset
*
pReducer
->
resColModel
->
capacity
;
pCtx
->
order
=
pQueryInfo
->
order
.
order
;
pCtx
->
functionId
=
pExpr
->
functionId
;
...
...
@@ -160,7 +159,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if
(
pMemBuffer
==
NULL
)
{
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
numOfBuffer
);
tscError
(
"%p pMemBuffer is NULL"
,
pMemBuffer
);
pRes
->
code
=
TSDB_CODE_TSC_APP_ERROR
;
return
;
...
...
@@ -168,7 +166,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if
(
pDesc
->
pColumnModel
==
NULL
)
{
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
numOfBuffer
);
tscError
(
"%p no local buffer or intermediate result format model"
,
pSql
);
pRes
->
code
=
TSDB_CODE_TSC_APP_ERROR
;
return
;
...
...
@@ -188,7 +185,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if
(
numOfFlush
==
0
||
numOfBuffer
==
0
)
{
tscLocalReducerEnvDestroy
(
pMemBuffer
,
pDesc
,
finalmodel
,
numOfBuffer
);
tscDebug
(
"%p retrieved no data"
,
pSql
);
return
;
}
...
...
@@ -279,6 +275,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
taosTFree
(
pReducer
);
return
;
}
param
->
pLocalData
=
pReducer
->
pLocalDataSrc
;
param
->
pDesc
=
pReducer
->
pDesc
;
param
->
num
=
pReducer
->
pLocalDataSrc
[
0
]
->
pMemBuffer
->
numOfElemsPerPage
;
...
...
src/client/src/tscServer.c
浏览文件 @
580448e3
...
...
@@ -226,17 +226,13 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.
handle
=
&
pSql
->
pRpcCtx
,
.
code
=
0
};
// NOTE: the rpc context should be acquired before sending data to server.
// Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash.
if
(
pObj
!=
NULL
&&
pObj
->
signature
==
pObj
)
{
rpcSendRequest
(
pObj
->
pDnodeConn
,
&
pSql
->
epSet
,
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
}
else
{
//pObj->signature has been reset by other thread, ignore concurrency problem
return
TSDB_CODE_TSC_CONN_KILLED
;
}
rpcSendRequest
(
pObj
->
pDnodeConn
,
&
pSql
->
epSet
,
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
}
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
,
SRpcEpSet
*
pEpSet
)
{
...
...
@@ -1495,8 +1491,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char
*
tmpData
=
NULL
;
uint32_t
len
=
pSql
->
cmd
.
payloadLen
;
if
(
len
>
0
)
{
tmpData
=
calloc
(
1
,
len
);
if
(
NULL
==
tmpData
)
{
if
((
tmpData
=
calloc
(
1
,
len
))
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
...
...
@@ -1541,8 +1536,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// copy payload content to temp buff
char *tmpData = 0;
if (pCmd->payloadLen > 0) {
tmpData = calloc(1, pCmd->payloadLen + 1);
if (NULL == tmpData) return -1;
if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
}
...
...
src/client/src/tscSubquery.c
浏览文件 @
580448e3
...
...
@@ -570,8 +570,9 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
SSchema
*
pColSchema
=
tscGetTableColumnSchemaById
(
pTableMetaInfo
->
pTableMeta
,
tagColId
);
*
s1
=
taosArrayInit
(
p1
->
num
,
p1
->
tagSize
);
*
s2
=
taosArrayInit
(
p2
->
num
,
p2
->
tagSize
);
// int16_t for padding
*
s1
=
taosArrayInit
(
p1
->
num
,
p1
->
tagSize
-
sizeof
(
int16_t
));
*
s2
=
taosArrayInit
(
p2
->
num
,
p2
->
tagSize
-
sizeof
(
int16_t
));
if
(
!
(
checkForDuplicateTagVal
(
pQueryInfo
,
p1
,
pParentSql
)
&&
checkForDuplicateTagVal
(
pQueryInfo
,
p2
,
pParentSql
)))
{
return
TSDB_CODE_QRY_DUP_JOIN_KEY
;
...
...
@@ -1039,6 +1040,10 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
int32_t
numOfExprs
=
(
int32_t
)
tscSqlExprNumOfExprs
(
pQueryInfo
);
pRes
->
pColumnIndex
=
calloc
(
1
,
sizeof
(
SColumnIndex
)
*
numOfExprs
);
if
(
pRes
->
pColumnIndex
==
NULL
)
{
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
;
}
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
...
...
@@ -1153,6 +1158,7 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code);
static
SSqlObj
*
tscCreateSTableSubquery
(
SSqlObj
*
pSql
,
SRetrieveSupport
*
trsupport
,
SSqlObj
*
prevSqlObj
);
// TODO
int32_t
tscLaunchJoinSubquery
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
SJoinSupporter
*
pSupporter
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
...
...
@@ -1199,7 +1205,9 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
// this data needs to be transfer to support struct
memset
(
&
pNewQueryInfo
->
fieldsInfo
,
0
,
sizeof
(
SFieldInfo
));
tscTagCondCopy
(
&
pSupporter
->
tagCond
,
&
pNewQueryInfo
->
tagCond
);
//pNewQueryInfo->tagCond;
if
(
tscTagCondCopy
(
&
pSupporter
->
tagCond
,
&
pNewQueryInfo
->
tagCond
)
!=
0
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pNew
->
cmd
.
numOfCols
=
0
;
pNewQueryInfo
->
intervalTime
=
0
;
...
...
@@ -1380,7 +1388,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
const
uint32_t
nBufferSize
=
(
1u
<<
16
);
// 64KB
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
pSql
->
numOfSubs
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
...
...
@@ -1395,9 +1403,20 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
}
pSql
->
pSubs
=
calloc
(
pSql
->
numOfSubs
,
POINTER_BYTES
);
tscDebug
(
"%p retrieved query data from %d vnode(s)"
,
pSql
,
pSql
->
numOfSubs
);
SSubqueryState
*
pState
=
calloc
(
1
,
sizeof
(
SSubqueryState
));
if
(
pSql
->
pSubs
==
NULL
||
pState
==
NULL
)
{
taosTFree
(
pState
);
taosTFree
(
pSql
->
pSubs
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscLocalReducerEnvDestroy
(
pMemoryBuf
,
pDesc
,
pModel
,
pSql
->
numOfSubs
);
tscQueueAsyncRes
(
pSql
);
return
ret
;
}
pState
->
numOfTotal
=
pSql
->
numOfSubs
;
pState
->
numOfRemain
=
pSql
->
numOfSubs
;
...
...
@@ -2029,8 +2048,21 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
numOfRes
=
(
int32_t
)(
MIN
(
numOfRes
,
pSql
->
pSubs
[
i
]
->
res
.
numOfRows
));
}
if
(
numOfRes
==
0
)
{
return
;
}
int32_t
totalSize
=
tscGetResRowLength
(
pQueryInfo
->
exprList
);
pRes
->
pRsp
=
realloc
(
pRes
->
pRsp
,
numOfRes
*
totalSize
);
assert
(
numOfRes
*
totalSize
>
0
);
char
*
tmp
=
realloc
(
pRes
->
pRsp
,
numOfRes
*
totalSize
);
if
(
tmp
==
NULL
)
{
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
;
}
else
{
pRes
->
pRsp
=
tmp
;
}
pRes
->
data
=
pRes
->
pRsp
;
char
*
data
=
pRes
->
data
;
...
...
@@ -2069,6 +2101,12 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
pRes
->
buffer
=
calloc
(
numOfExprs
,
POINTER_BYTES
);
pRes
->
length
=
calloc
(
numOfExprs
,
sizeof
(
int32_t
));
if
(
pRes
->
tsrow
==
NULL
||
pRes
->
buffer
==
NULL
||
pRes
->
length
==
NULL
)
{
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscQueueAsyncRes
(
pSql
);
return
;
}
tscRestoreSQLFuncForSTableQuery
(
pQueryInfo
);
}
...
...
src/client/src/tscUtil.c
浏览文件 @
580448e3
...
...
@@ -254,15 +254,12 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes
->
numOfCols
=
numOfOutput
;
pRes
->
tsrow
=
calloc
(
numOfOutput
,
POINTER_BYTES
);
pRes
->
length
=
calloc
(
numOfOutput
,
sizeof
(
int32_t
));
// todo refactor
pRes
->
length
=
calloc
(
numOfOutput
,
sizeof
(
int32_t
));
pRes
->
buffer
=
calloc
(
numOfOutput
,
POINTER_BYTES
);
// not enough memory
if
(
pRes
->
tsrow
==
NULL
||
(
pRes
->
buffer
==
NULL
&&
pRes
->
numOfCols
>
0
))
{
taosTFree
(
pRes
->
tsrow
);
taosTFree
(
pRes
->
buffer
);
taosTFree
(
pRes
->
length
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
pRes
->
code
;
}
...
...
@@ -281,13 +278,14 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
}
taosTFree
(
pRes
->
pRsp
);
taosTFree
(
pRes
->
tsrow
);
taosTFree
(
pRes
->
length
);
taosTFree
(
pRes
->
buffer
);
taosTFree
(
pRes
->
pGroupRec
);
taosTFree
(
pRes
->
pColumnIndex
);
taosTFree
(
pRes
->
buffer
);
if
(
pRes
->
pArithSup
!=
NULL
)
{
taosTFree
(
pRes
->
pArithSup
->
data
);
taosTFree
(
pRes
->
pArithSup
);
...
...
@@ -1052,7 +1050,7 @@ void tscSqlExprInfoDestroy(SArray* pExprInfo) {
taosArrayDestroy
(
pExprInfo
);
}
void
tscSqlExprCopy
(
SArray
*
dst
,
const
SArray
*
src
,
uint64_t
uid
,
bool
deepcopy
)
{
int32_t
tscSqlExprCopy
(
SArray
*
dst
,
const
SArray
*
src
,
uint64_t
uid
,
bool
deepcopy
)
{
assert
(
src
!=
NULL
&&
dst
!=
NULL
);
size_t
size
=
taosArrayGetSize
(
src
);
...
...
@@ -1064,7 +1062,7 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
if
(
deepcopy
)
{
SSqlExpr
*
p1
=
calloc
(
1
,
sizeof
(
SSqlExpr
));
if
(
p1
==
NULL
)
{
assert
(
0
)
;
return
-
1
;
}
*
p1
=
*
pExpr
;
...
...
@@ -1078,6 +1076,8 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
}
}
}
return
0
;
}
SColumn
*
tscColumnListInsert
(
SArray
*
pColumnList
,
SColumnIndex
*
pColIndex
)
{
...
...
@@ -1324,11 +1324,14 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
return
false
;
}
void
tscTagCondCopy
(
STagCond
*
dest
,
const
STagCond
*
src
)
{
int32_t
tscTagCondCopy
(
STagCond
*
dest
,
const
STagCond
*
src
)
{
memset
(
dest
,
0
,
sizeof
(
STagCond
));
if
(
src
->
tbnameCond
.
cond
!=
NULL
)
{
dest
->
tbnameCond
.
cond
=
strdup
(
src
->
tbnameCond
.
cond
);
if
(
dest
->
tbnameCond
.
cond
==
NULL
)
{
return
-
1
;
}
}
dest
->
tbnameCond
.
uid
=
src
->
tbnameCond
.
uid
;
...
...
@@ -1337,7 +1340,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) {
dest
->
relType
=
src
->
relType
;
if
(
src
->
pCond
==
NULL
)
{
return
;
return
0
;
}
size_t
s
=
taosArrayGetSize
(
src
->
pCond
);
...
...
@@ -1354,7 +1357,7 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) {
assert
(
pCond
->
cond
!=
NULL
);
c
.
cond
=
malloc
(
c
.
len
);
if
(
c
.
cond
==
NULL
)
{
assert
(
0
)
;
return
-
1
;
}
memcpy
(
c
.
cond
,
pCond
->
cond
,
c
.
len
);
...
...
@@ -1362,6 +1365,8 @@ void tscTagCondCopy(STagCond* dest, const STagCond* src) {
taosArrayPush
(
dest
->
pCond
,
&
c
);
}
return
0
;
}
void
tscTagCondRelease
(
STagCond
*
pTagCond
)
{
...
...
@@ -1854,7 +1859,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
}
tscTagCondCopy
(
&
pNewQueryInfo
->
tagCond
,
&
pQueryInfo
->
tagCond
);
if
(
tscTagCondCopy
(
&
pNewQueryInfo
->
tagCond
,
&
pQueryInfo
->
tagCond
)
!=
0
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
pQueryInfo
->
fillType
!=
TSDB_FILL_NONE
)
{
pNewQueryInfo
->
fillVal
=
malloc
(
pQueryInfo
->
fieldsInfo
.
numOfOutput
*
sizeof
(
int64_t
));
...
...
@@ -1883,7 +1891,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
uint64_t
uid
=
pTableMetaInfo
->
pTableMeta
->
id
.
uid
;
tscSqlExprCopy
(
pNewQueryInfo
->
exprList
,
pQueryInfo
->
exprList
,
uid
,
true
);
if
(
tscSqlExprCopy
(
pNewQueryInfo
->
exprList
,
pQueryInfo
->
exprList
,
uid
,
true
)
!=
0
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
doSetSqlExprAndResultFieldInfo
(
pQueryInfo
,
pNewQueryInfo
,
uid
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录