Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0019048a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0019048a
编写于
4月 04, 2020
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-32] change the synchronized query execution method.
上级
58258bc9
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
57 addition
and
79 deletion
+57
-79
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+0
-1
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+3
-5
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+0
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+1
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+8
-6
src/client/src/tscSql.c
src/client/src/tscSql.c
+27
-31
src/client/src/tscSub.c
src/client/src/tscSub.c
+0
-1
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+7
-13
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+3
-12
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+8
-8
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
0019048a
...
...
@@ -357,7 +357,6 @@ typedef struct SSqlObj {
char
freed
:
4
;
char
listed
:
4
;
tsem_t
rspSem
;
tsem_t
emptyRspSem
;
SSqlCmd
cmd
;
SSqlRes
res
;
uint8_t
numOfSubs
;
...
...
src/client/src/tscAsync.c
浏览文件 @
0019048a
...
...
@@ -51,17 +51,15 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
))
{
tscError
(
"failed to malloc payload"
);
tfree
(
pSql
);
tscQueueAsyncError
(
fp
,
param
);
return
;
}
pSql
->
sqlstr
=
malloc
(
sqlLen
+
1
);
pSql
->
sqlstr
=
realloc
(
pSql
->
sqlstr
,
sqlLen
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"%p failed to malloc sql string buffer"
,
pSql
);
tscQueueAsyncError
(
fp
,
param
);
free
(
pCmd
->
payload
);
free
(
pSql
);
return
;
}
...
...
@@ -412,7 +410,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if
(
code
!=
0
)
{
pRes
->
code
=
code
;
tscTrace
(
"%p failed to renew tableMeta"
,
pSql
);
tsem_post
(
&
pSql
->
rspSem
);
//
tsem_post(&pSql->rspSem);
}
else
{
tscTrace
(
"%p renew tableMeta successfully, command:%d, code:%d, retry:%d"
,
pSql
,
pSql
->
cmd
.
command
,
pSql
->
res
.
code
,
pSql
->
retry
);
...
...
@@ -424,7 +422,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
code
=
tscSendMsgToServer
(
pSql
);
if
(
code
!=
0
)
{
pRes
->
code
=
code
;
tsem_post
(
&
pSql
->
rspSem
);
//
tsem_post(&pSql->rspSem);
}
}
...
...
src/client/src/tscPrepare.c
浏览文件 @
0019048a
...
...
@@ -488,7 +488,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
}
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
tsem_init
(
&
pSql
->
emptyRspSem
,
0
,
1
);
pSql
->
signature
=
pSql
;
pSql
->
pTscObj
=
pObj
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
0019048a
...
...
@@ -215,7 +215,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if
(
pQueryInfo
->
numOfTables
==
0
)
{
pTableMetaInfo
=
tscAddEmptyMetaInfo
(
pQueryInfo
);
}
else
{
pTableMetaInfo
=
&
pQueryInfo
->
pTableMetaInfo
[
0
];
pTableMetaInfo
=
pQueryInfo
->
pTableMetaInfo
[
0
];
}
pCmd
->
command
=
pInfo
->
type
;
...
...
src/client/src/tscServer.c
浏览文件 @
0019048a
...
...
@@ -285,14 +285,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
pRes
->
rspType
=
rpcMsg
->
msgType
;
pRes
->
rspLen
=
rpcMsg
->
contLen
;
if
(
pRes
->
rspLen
>
0
)
{
char
*
tmp
=
(
char
*
)
realloc
(
pRes
->
pRsp
,
pRes
->
rspLen
);
if
(
tmp
==
NULL
)
{
pRes
->
code
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
else
{
pRes
->
pRsp
=
tmp
;
if
(
pRes
->
rspLen
)
{
memcpy
(
pRes
->
pRsp
,
rpcMsg
->
pCont
,
pRes
->
rspLen
);
}
}
else
{
pRes
->
pRsp
=
NULL
;
}
// ignore the error information returned from mnode when set ignore flag in sql
...
...
src/client/src/tscSql.c
浏览文件 @
0019048a
...
...
@@ -268,9 +268,10 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
}
static
void
syncQueryCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
{
assert
(
param
!=
NULL
);
STscObj
*
pObj
=
(
STscObj
*
)
param
;
assert
(
pObj
!=
NULL
&&
pObj
->
pSql
!=
NULL
);
assert
(
pObj
->
pSql
!=
NULL
);
sem_post
(
&
pObj
->
pSql
->
rspSem
);
}
...
...
@@ -281,14 +282,7 @@ int taos_query(TAOS *taos, const char *sqlstr) {
return
TSDB_CODE_DISCONNECTED
;
}
SSqlObj
*
pSql
=
(
SSqlObj
*
)
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pSql
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
return
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
pObj
->
pSql
=
pSql
;
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
SSqlObj
*
pSql
=
pObj
->
pSql
;
int32_t
sqlLen
=
strlen
(
sqlstr
);
doAsyncQuery
(
pObj
,
pObj
->
pSql
,
syncQueryCallback
,
taos
,
sqlstr
,
sqlLen
);
...
...
@@ -651,10 +645,10 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
static
void
asyncFetchCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
tres
;
if
(
numOfRows
<
0
)
{
// set the error code
pSql
->
res
.
code
=
-
numOfRows
;
}
sem_post
(
&
pSql
->
rspSem
);
}
...
...
@@ -754,20 +748,18 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if
(
pRes
==
NULL
||
pRes
->
qhandle
==
0
)
{
/* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace
(
"%p qhandle is null, abort free, fp:%p"
,
pSql
,
pSql
->
fp
);
if
(
pSql
->
fp
!=
NULL
)
{
STscObj
*
pObj
=
pSql
->
pTscObj
;
if
(
pSql
==
pObj
->
pSql
)
{
pObj
->
pSql
=
NULL
;
if
(
tscShouldFreeAsyncSqlObj
(
pSql
))
{
tscFreeSqlObj
(
pSql
);
}
tscTrace
(
"%p Async SqlObj is freed by app"
,
pSql
);
}
else
if
(
keepCmd
)
{
}
else
{
if
(
keepCmd
)
{
tscFreeSqlResult
(
pSql
);
}
else
{
tscFreeSqlObjPartial
(
pSql
);
}
}
return
;
}
...
...
@@ -836,17 +828,20 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
}
}
else
{
// if no free resource msg is sent to vnode, we free this object immediately.
if
(
pSql
->
fp
)
{
bool
free
=
tscShouldFreeAsyncSqlObj
(
pSql
);
if
(
free
)
{
assert
(
pRes
->
numOfRows
==
0
||
(
pCmd
->
command
>
TSDB_SQL_LOCAL
));
tscFreeSqlObj
(
pSql
);
tscTrace
(
"%p Async sql result is freed by app"
,
pSql
);
}
else
if
(
keepCmd
)
{
}
else
{
if
(
keepCmd
)
{
tscFreeSqlResult
(
pSql
);
tscTrace
(
"%p sql result is freed while sql command is kept"
,
pSql
);
}
else
{
tscFreeSqlObjPartial
(
pSql
);
tscTrace
(
"%p sql result is freed"
,
pSql
);
tscTrace
(
"%p sql result is freed by app"
,
pSql
);
}
}
}
}
...
...
@@ -892,9 +887,10 @@ char *taos_errstr(TAOS *taos) {
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
return
(
char
*
)
tstrerror
(
globalCode
);
return
(
char
*
)
tstrerror
(
terrno
);
SSqlObj
*
pSql
=
pObj
->
pSql
;
SSqlObj
*
pSql
=
pObj
->
pSql
;
if
(
hasAdditionalErrorInfo
(
pSql
->
res
.
code
,
&
pSql
->
cmd
))
{
return
pSql
->
cmd
.
payload
;
}
else
{
...
...
src/client/src/tscSub.c
浏览文件 @
0019048a
...
...
@@ -124,7 +124,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
pSql
->
sqlstr
=
sqlstr
;
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
tsem_init
(
&
pSql
->
emptyRspSem
,
0
,
1
);
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
numOfRows
=
1
;
...
...
src/client/src/tscSubquery.c
浏览文件 @
0019048a
...
...
@@ -392,10 +392,10 @@ void freeSubqueryObj(SSqlObj* pSql) {
static
void
doQuitSubquery
(
SSqlObj
*
pParentSql
)
{
freeSubqueryObj
(
pParentSql
);
tsem_wait
(
&
pParentSql
->
emptyRspSem
);
tsem_wait
(
&
pParentSql
->
emptyRspSem
);
//
tsem_wait(&pParentSql->emptyRspSem);
//
tsem_wait(&pParentSql->emptyRspSem);
tsem_post
(
&
pParentSql
->
rspSem
);
//
tsem_post(&pParentSql->rspSem);
}
static
void
quitAllSubquery
(
SSqlObj
*
pSqlObj
,
SJoinSubquerySupporter
*
pSupporter
)
{
...
...
@@ -567,7 +567,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
freeSubqueryObj
(
pParentSql
);
}
tsem_post
(
&
pParentSql
->
rspSem
);
//
tsem_post(&pParentSql->rspSem);
}
else
{
tscTrace
(
"%p sub:%p completed, completed:%d, total:%d"
,
pParentSql
,
tres
,
finished
,
numOfTotal
);
}
...
...
@@ -662,7 +662,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
}
// wait for all subquery completed
tsem_wait
(
&
pSql
->
rspSem
);
//
tsem_wait(&pSql->rspSem);
// update the records for each subquery
for
(
int32_t
i
=
0
;
i
<
pSql
->
numOfSubs
;
++
i
)
{
...
...
@@ -797,10 +797,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscProcessSql
(
pSql
);
}
else
{
// first retrieve from vnode during the secondary stage sub-query
if
(
pParentSql
->
fp
==
NULL
)
{
tsem_wait
(
&
pParentSql
->
emptyRspSem
);
tsem_wait
(
&
pParentSql
->
emptyRspSem
);
tsem_post
(
&
pParentSql
->
rspSem
);
// tsem_post(&pParentSql->rspSem);
}
else
{
// set the command flag must be after the semaphore been correctly set.
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
...
...
@@ -954,10 +951,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
}
}
tsem_post
(
&
pSql
->
emptyRspSem
);
tsem_wait
(
&
pSql
->
rspSem
);
tsem_post
(
&
pSql
->
emptyRspSem
);
// tsem_wait(&pSql->rspSem);
if
(
pSql
->
numOfSubs
<=
0
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
...
...
src/client/src/tscSystem.c
浏览文件 @
0019048a
...
...
@@ -40,15 +40,10 @@ void * tscQhandle;
void
*
tscCheckDiskUsageTmr
;
int
tsInsertHeadSize
;
extern
int
tscEmbedded
;
int
tscNumOfThreads
;
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
static
pthread_mutex_t
tscMutex
;
extern
int
tsTscEnableRecordSql
;
extern
int
tsNumOfLogLines
;
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
void
taosInitNote
(
int
numOfNoteLines
,
int
maxNotes
,
char
*
lable
);
void
deltaToUtcInitOnce
();
void
tscCheckDiskUsage
(
void
*
para
,
void
*
unused
)
{
taosGetDisk
();
...
...
@@ -60,7 +55,6 @@ int32_t tscInitRpc(const char *user, const char *secret) {
char
secretEncrypt
[
32
]
=
{
0
};
taosEncryptPass
((
uint8_t
*
)
secret
,
strlen
(
secret
),
secretEncrypt
);
pthread_mutex_lock
(
&
tscMutex
);
if
(
pVnodeConn
==
NULL
)
{
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localIp
=
tsLocalIp
;
...
...
@@ -78,7 +72,6 @@ int32_t tscInitRpc(const char *user, const char *secret) {
pVnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
pVnodeConn
==
NULL
)
{
tscError
(
"failed to init connection to vnode"
);
pthread_mutex_unlock
(
&
tscMutex
);
return
-
1
;
}
}
...
...
@@ -100,12 +93,10 @@ int32_t tscInitRpc(const char *user, const char *secret) {
pTscMgmtConn
=
rpcOpen
(
&
rpcInit
);
if
(
pTscMgmtConn
==
NULL
)
{
tscError
(
"failed to init connection to mgmt"
);
pthread_mutex_unlock
(
&
tscMutex
);
return
-
1
;
}
}
pthread_mutex_unlock
(
&
tscMutex
);
return
0
;
}
...
...
@@ -113,7 +104,7 @@ void taos_init_imp() {
char
temp
[
128
];
struct
stat
dirstat
;
pthread_mutex_init
(
&
tscMutex
,
NULL
)
;
errno
=
TSDB_CODE_SUCCESS
;
srand
(
taosGetTimestampSec
());
deltaToUtcInitOnce
();
...
...
src/client/src/tscUtil.c
浏览文件 @
0019048a
...
...
@@ -469,7 +469,7 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
pSql
->
freed
=
0
;
tscFreeSqlCmdData
(
pCmd
);
tscTrace
(
"%p
free sqlObj partial
completed"
,
pSql
);
tscTrace
(
"%p
partially free sqlObj
completed"
,
pSql
);
}
void
tscFreeSqlObj
(
SSqlObj
*
pSql
)
{
...
...
@@ -487,8 +487,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tfree
(
pCmd
->
payload
);
pCmd
->
allocSize
=
0
;
tsem_destroy
(
&
pSql
->
rspSem
);
free
(
pSql
);
}
...
...
@@ -820,7 +818,9 @@ void tscCloseTscObj(STscObj* pObj) {
taosTmrStopA
(
&
(
pObj
->
pTimer
));
tscFreeSqlObj
(
pSql
);
sem_destroy
(
&
pSql
->
rspSem
);
pthread_mutex_destroy
(
&
pObj
->
mutex
);
tscTrace
(
"%p DB connection is closed"
,
pObj
);
tfree
(
pObj
);
}
...
...
@@ -842,10 +842,9 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
if
(
pCmd
->
payload
==
NULL
)
{
assert
(
pCmd
->
allocSize
==
0
);
pCmd
->
payload
=
(
char
*
)
malloc
(
size
);
pCmd
->
payload
=
(
char
*
)
calloc
(
1
,
size
);
if
(
pCmd
->
payload
==
NULL
)
return
TSDB_CODE_CLI_OUT_OF_MEMORY
;
pCmd
->
allocSize
=
size
;
memset
(
pCmd
->
payload
,
0
,
pCmd
->
allocSize
);
}
else
{
if
(
pCmd
->
allocSize
<
size
)
{
char
*
b
=
realloc
(
pCmd
->
payload
,
size
);
...
...
@@ -853,6 +852,8 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
pCmd
->
payload
=
b
;
pCmd
->
allocSize
=
size
;
}
memset
(
pCmd
->
payload
,
0
,
pCmd
->
payloadLen
);
}
//memset(pCmd->payload, 0, pCmd->allocSize);
...
...
@@ -1742,7 +1743,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
}
STscObj
*
pTscObj
=
pSql
->
pTscObj
;
if
(
pSql
->
pStream
!=
NULL
||
pTscObj
->
pHb
==
pSql
)
{
if
(
pSql
->
pStream
!=
NULL
||
pTscObj
->
pHb
==
pSql
||
pTscObj
->
pSql
==
pSql
)
{
return
false
;
}
...
...
@@ -1929,7 +1930,6 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
}
pTableMetaInfo
->
pTableMeta
=
pTableMeta
;
// pTableMetaInfo->pMetricMeta = pMetricMeta;
pTableMetaInfo
->
numOfTags
=
numOfTags
;
if
(
tags
!=
NULL
)
{
...
...
@@ -1963,7 +1963,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro
}
void
tscRemoveAllMeterMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
address
,
bool
removeFromCache
)
{
tscTrace
(
"%p deref the
metric/meter
meta in cache, numOfTables:%d"
,
address
,
pQueryInfo
->
numOfTables
);
tscTrace
(
"%p deref the
table
meta in cache, numOfTables:%d"
,
address
,
pQueryInfo
->
numOfTables
);
int32_t
index
=
pQueryInfo
->
numOfTables
;
while
(
index
>=
0
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录