Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
xindoo
redis
提交
f771dc23
R
redis
项目概览
xindoo
/
redis
通知
2
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
redis
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f771dc23
编写于
1月 05, 2011
作者:
A
antirez
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
IO performances greatly improved under high writes load
上级
6eaad663
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
113 addition
and
80 deletion
+113
-80
src/dscache.c
src/dscache.c
+109
-80
src/redis.h
src/redis.h
+4
-0
未找到文件。
src/dscache.c
浏览文件 @
f771dc23
...
...
@@ -107,7 +107,8 @@
*/
void
spawnIOThread
(
void
);
int
cacheScheduleIOPushJobs
(
int
onlyloads
);
int
cacheScheduleIOPushJobs
(
int
flags
);
int
processActiveIOJobs
(
int
max
);
/* =================== Virtual Memory - Blocking Side ====================== */
...
...
@@ -216,9 +217,9 @@ int cacheFreeOneEntry(void) {
* otherwise we'll use an infinite amount of memory if changes to
* the dataset are faster than I/O */
if
(
listLength
(
server
.
cache_io_queue
)
>
0
)
{
cacheScheduleIOPushJobs
(
0
);
waitEmptyIOJobsQueue
(
);
processA
llPendingIOJobs
(
);
redisLog
(
REDIS_DEBUG
,
"--- Busy waiting IO to reclaim memory"
);
cacheScheduleIOPushJobs
(
REDIS_IO_ASAP
);
processA
ctiveIOJobs
(
1
);
return
REDIS_OK
;
}
/* Nothing to free at all... */
...
...
@@ -484,62 +485,86 @@ void spawnIOThread(void) {
server
.
io_active_threads
++
;
}
/* Wait that all the pending IO Jobs are processed */
void
waitEmptyIOJobsQueue
(
void
)
{
while
(
1
)
{
/* Wait that up to 'max' pending IO Jobs are processed by the I/O thread.
* From our point of view an IO job processed means that the count of
* server.io_processed must increase by one.
*
* If max is -1, all the pending IO jobs will be processed.
*
* Returns the number of IO jobs processed.
*
* NOTE: while this may appear like a busy loop, we are actually blocked
* by IO since we continuously acquire/release the IO lock. */
int
processActiveIOJobs
(
int
max
)
{
int
processed
=
0
;
while
(
max
==
-
1
||
max
>
0
)
{
int
io_processed_len
;
lockThreadedIO
();
if
(
listLength
(
server
.
io_newjobs
)
==
0
&&
listLength
(
server
.
io_processing
)
==
0
)
{
/* There is nothing more to process */
unlockThreadedIO
();
return
;
break
;
}
#if 0
/* If there are new jobs we need to signal the thread to
* process the next one. */
redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d",
listLength(server.io_newjobs),
listLength(server.io_processing));
/* FIXME: signal or not?
if (listLength(server.io_newjobs)) {
pthread_cond_signal(&server.io_condvar);
}
*/
/* While waiting for empty jobs queue condition we post-process some
* finshed job, as I/O threads may be hanging trying to write against
* the io_ready_pipe_write FD but there are so much pending jobs that
* it's blocking. */
#endif
/* Check if we can process some finished job */
io_processed_len
=
listLength
(
server
.
io_processed
);
unlockThreadedIO
();
if
(
io_processed_len
)
{
vmThreadedIOCompletedJob
(
NULL
,
server
.
io_ready_pipe_read
,
(
void
*
)
0xdeadbeef
,
0
);
/* FIXME: probably wiser to drop this sleeps. Anyway
* the contention on the IO thread will avoid we to loop
* too fast here. */
usleep
(
1000
);
/* 1 millisecond */
}
else
{
/* FIXME: same as fixme above. */
usleep
(
10000
);
/* 10 milliseconds */
processed
++
;
if
(
max
!=
-
1
)
max
--
;
}
}
return
processed
;
}
/* Process all the IO Jobs already completed by threads but still waiting
* processing from the main thread. */
void
processAllPendingIOJobs
(
void
)
{
while
(
1
)
{
void
waitEmptyIOJobsQueue
(
void
)
{
processActiveIOJobs
(
-
1
);
}
/* Process up to 'max' IO Jobs already completed by threads but still waiting
* processing from the main thread.
*
* If max == -1 all the pending jobs are processed.
*
* The number of processed jobs is returned. */
int
processPendingIOJobs
(
int
max
)
{
int
processed
=
0
;
while
(
max
==
-
1
||
max
>
0
)
{
int
io_processed_len
;
lockThreadedIO
();
io_processed_len
=
listLength
(
server
.
io_processed
);
unlockThreadedIO
();
if
(
io_processed_len
==
0
)
return
;
if
(
io_processed_len
==
0
)
break
;
vmThreadedIOCompletedJob
(
NULL
,
server
.
io_ready_pipe_read
,
(
void
*
)
0xdeadbeef
,
0
);
if
(
max
!=
-
1
)
max
--
;
processed
++
;
}
return
processed
;
}
void
processAllPendingIOJobs
(
void
)
{
processPendingIOJobs
(
-
1
);
}
/* This function must be called while with threaded IO locked */
...
...
@@ -665,7 +690,7 @@ void cacheScheduleIO(redisDb *db, robj *key, int type) {
* in queue for the same key. */
if
(
type
==
REDIS_IO_LOAD
&&
!
(
flags
&
REDIS_IO_SAVE
))
{
listAddNodeHead
(
server
.
cache_io_queue
,
op
);
cacheScheduleIOPushJobs
(
1
);
cacheScheduleIOPushJobs
(
REDIS_IO_ONLYLOADS
);
}
else
{
/* FIXME: probably when this happens we want to at least move
* the write job about this queue on top, and set the creation time
...
...
@@ -675,13 +700,19 @@ void cacheScheduleIO(redisDb *db, robj *key, int type) {
}
/* Push scheduled IO operations into IO Jobs that the IO thread can process.
* If 'onlyloads' is true only IO_LOAD jobs are processed: this is useful
* since it's save to push LOAD IO jobs from any place of the code, while
*
* If flags include REDIS_IO_ONLYLOADS only load jobs are processed:this is
* useful since it's safe to push LOAD IO jobs from any place of the code, while
* SAVE io jobs should never be pushed while we are processing a command
* (not protected by lookupKey() that will block on keys in IO_SAVEINPROG
* state. */
* state.
*
* The REDIS_IO_ASAP flag tells the function to don't wait for the IO job
* scheduled completion time, but just do the operation ASAP. This is useful
* when we need to reclaim memory from the IO queue.
*/
#define MAX_IO_JOBS_QUEUE 100
int
cacheScheduleIOPushJobs
(
int
onlyload
s
)
{
int
cacheScheduleIOPushJobs
(
int
flag
s
)
{
time_t
now
=
time
(
NULL
);
listNode
*
ln
;
int
jobs
,
topush
=
0
,
pushed
=
0
;
...
...
@@ -699,66 +730,64 @@ int cacheScheduleIOPushJobs(int onlyloads) {
while
((
ln
=
listFirst
(
server
.
cache_io_queue
))
!=
NULL
)
{
ioop
*
op
=
ln
->
value
;
struct
dictEntry
*
de
;
robj
*
val
;
if
(
!
topush
)
break
;
topush
--
;
if
(
op
->
type
==
REDIS_IO_LOAD
||
(
!
onlyloads
&&
(
now
-
op
->
ctime
)
>=
server
.
cache_flush_delay
))
if
(
op
->
type
!=
REDIS_IO_LOAD
&&
flags
&
REDIS_IO_ONLYLOADS
)
break
;
if
(
!
(
flags
&
REDIS_IO_ASAP
)
&&
(
now
-
op
->
ctime
)
<
server
.
cache_flush_delay
)
break
;
/* Don't add a SAVE job in the IO thread queue if there is already
* a save in progress for the same key. */
if
(
op
->
type
==
REDIS_IO_SAVE
&&
cacheScheduleIOGetFlags
(
op
->
db
,
op
->
key
)
&
REDIS_IO_SAVEINPROG
)
{
struct
dictEntry
*
de
;
robj
*
val
;
/* Don't add a SAVE job in queue if there is already
* a save in progress for the same key. */
if
(
op
->
type
==
REDIS_IO_SAVE
&&
cacheScheduleIOGetFlags
(
op
->
db
,
op
->
key
)
&
REDIS_IO_SAVEINPROG
)
{
/* Move the operation at the end of the list of there
* are other operations. Otherwise break, nothing to do
* here. */
if
(
listLength
(
server
.
cache_io_queue
)
>
1
)
{
listDelNode
(
server
.
cache_io_queue
,
ln
);
listAddNodeTail
(
server
.
cache_io_queue
,
op
);
continue
;
}
else
{
break
;
}
/* Move the operation at the end of the list if there
* are other operations, so we can try to process the next one.
* Otherwise break, nothing to do here. */
if
(
listLength
(
server
.
cache_io_queue
)
>
1
)
{
listDelNode
(
server
.
cache_io_queue
,
ln
);
listAddNodeTail
(
server
.
cache_io_queue
,
op
);
continue
;
}
else
{
break
;
}
}
redisLog
(
REDIS_DEBUG
,
"Creating IO %s Job for key %s"
,
op
->
type
==
REDIS_IO_LOAD
?
"load"
:
"save"
,
op
->
key
->
ptr
);
redisLog
(
REDIS_DEBUG
,
"Creating IO %s Job for key %s"
,
op
->
type
==
REDIS_IO_LOAD
?
"load"
:
"save"
,
op
->
key
->
ptr
);
if
(
op
->
type
==
REDIS_IO_LOAD
)
{
dsCreateIOJob
(
REDIS_IOJOB_LOAD
,
op
->
db
,
op
->
key
,
NULL
);
if
(
op
->
type
==
REDIS_IO_LOAD
)
{
dsCreateIOJob
(
REDIS_IOJOB_LOAD
,
op
->
db
,
op
->
key
,
NULL
);
}
else
{
/* Lookup the key, in order to put the current value in the IO
* Job. Otherwise if the key does not exists we schedule a disk
* store delete operation, setting the value to NULL. */
de
=
dictFind
(
op
->
db
->
dict
,
op
->
key
->
ptr
);
if
(
de
)
{
val
=
dictGetEntryVal
(
de
);
}
else
{
/* Lookup the key, in order to put the current value in the IO
* Job. Otherwise if the key does not exists we schedule a disk
* store delete operation, setting the value to NULL. */
de
=
dictFind
(
op
->
db
->
dict
,
op
->
key
->
ptr
);
if
(
de
)
{
val
=
dictGetEntryVal
(
de
);
}
else
{
/* Setting the value to NULL tells the IO thread to delete
* the key on disk. */
val
=
NULL
;
}
dsCreateIOJob
(
REDIS_IOJOB_SAVE
,
op
->
db
,
op
->
key
,
val
);
/* Setting the value to NULL tells the IO thread to delete
* the key on disk. */
val
=
NULL
;
}
/* Mark the operation as in progress. */
cacheScheduleIODelFlag
(
op
->
db
,
op
->
key
,
op
->
type
);
cacheScheduleIOAddFlag
(
op
->
db
,
op
->
key
,
(
op
->
type
==
REDIS_IO_LOAD
)
?
REDIS_IO_LOADINPROG
:
REDIS_IO_SAVEINPROG
);
/* Finally remove the operation from the queue.
* But we'll have trace of it in the hash table. */
listDelNode
(
server
.
cache_io_queue
,
ln
);
decrRefCount
(
op
->
key
);
zfree
(
op
);
pushed
++
;
}
else
{
break
;
/* too early */
dsCreateIOJob
(
REDIS_IOJOB_SAVE
,
op
->
db
,
op
->
key
,
val
);
}
/* Mark the operation as in progress. */
cacheScheduleIODelFlag
(
op
->
db
,
op
->
key
,
op
->
type
);
cacheScheduleIOAddFlag
(
op
->
db
,
op
->
key
,
(
op
->
type
==
REDIS_IO_LOAD
)
?
REDIS_IO_LOADINPROG
:
REDIS_IO_SAVEINPROG
);
/* Finally remove the operation from the queue.
* But we'll have trace of it in the hash table. */
listDelNode
(
server
.
cache_io_queue
,
ln
);
decrRefCount
(
op
->
key
);
zfree
(
op
);
pushed
++
;
}
return
pushed
;
}
...
...
src/redis.h
浏览文件 @
f771dc23
...
...
@@ -125,6 +125,10 @@
#define REDIS_IO_LOADINPROG 4
#define REDIS_IO_SAVEINPROG 8
/* Generic IO flags */
#define REDIS_IO_ONLYLOADS 1
#define REDIS_IO_ASAP 2
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录