diff --git a/runtest-moduleapi b/runtest-moduleapi index 84cdb9bb8185b0aacf0c000416aedf1ed581d9d8..da61ab25cb1510b746b3fbce884f42f701dc2256 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -13,4 +13,5 @@ then fi make -C tests/modules && \ -$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}" +$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter --single unit/moduleapi/fork "${@}" + diff --git a/src/aof.c b/src/aof.c index 565ee807351333ebd0c967bce9a5a64f8876a7c3..fc62d86ed036471dde38dd179138261380c049c6 100644 --- a/src/aof.c +++ b/src/aof.c @@ -264,9 +264,9 @@ int startAppendOnly(void) { strerror(errno)); return C_ERR; } - if (server.rdb_child_pid != -1) { + if (hasForkChild() && server.aof_child_pid == -1) { server.aof_rewrite_scheduled = 1; - serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible."); + serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible."); } else { /* If there is a pending AOF rewrite, we need to switch it off and * start a new one: the old one cannot be reused because it is not @@ -397,7 +397,7 @@ void flushAppendOnlyFile(int force) { * useful for graphing / monitoring purposes. */ if (sync_in_progress) { latencyAddSampleIfNeeded("aof-write-pending-fsync",latency); - } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) { + } else if (hasForkChild()) { latencyAddSampleIfNeeded("aof-write-active-child",latency); } else { latencyAddSampleIfNeeded("aof-write-alone",latency); @@ -493,9 +493,8 @@ void flushAppendOnlyFile(int force) { try_fsync: /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ - if (server.aof_no_fsync_on_rewrite && - (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) - return; + if (server.aof_no_fsync_on_rewrite && hasForkChild()) + return; /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { @@ -1562,39 +1561,24 @@ void aofClosePipes(void) { */ int rewriteAppendOnlyFileBackground(void) { pid_t childpid; - long long start; - if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; + if (hasForkChild()) return C_ERR; if (aofCreatePipes() != C_OK) return C_ERR; openChildInfoPipe(); - start = ustime(); - if ((childpid = fork()) == 0) { + if ((childpid = redisFork()) == 0) { char tmpfile[256]; /* Child */ - closeListeningSockets(0); redisSetProcTitle("redis-aof-rewrite"); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == C_OK) { - size_t private_dirty = zmalloc_get_private_dirty(-1); - - if (private_dirty) { - serverLog(LL_NOTICE, - "AOF rewrite: %zu MB of memory used by copy-on-write", - private_dirty/(1024*1024)); - } - - server.child_info_data.cow_size = private_dirty; - sendChildInfo(CHILD_INFO_TYPE_AOF); + sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite"); exitFromChild(0); } else { exitFromChild(1); } } else { /* Parent */ - server.stat_fork_time = ustime()-start; - server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ - latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); if (childpid == -1) { closeChildInfoPipe(); serverLog(LL_WARNING, @@ -1608,7 +1592,6 @@ int rewriteAppendOnlyFileBackground(void) { server.aof_rewrite_scheduled = 0; server.aof_rewrite_time_start = time(NULL); server.aof_child_pid = childpid; - updateDictResizePolicy(); /* We set appendseldb to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command, so the differences * accumulated by the parent into server.aof_rewrite_buf will start @@ -1623,7 +1606,7 @@ int rewriteAppendOnlyFileBackground(void) { void bgrewriteaofCommand(client *c) { if (server.aof_child_pid != -1) { addReplyError(c,"Background append only file rewriting already in progress"); - } else if (server.rdb_child_pid != -1) { + } else if (hasForkChild()) { server.aof_rewrite_scheduled = 1; addReplyStatus(c,"Background append only file rewriting scheduled"); } else if (rewriteAppendOnlyFileBackground() == C_OK) { diff --git a/src/childinfo.c b/src/childinfo.c index 719025e8c7ad4e624ae797d3ac346e1c304e5fd2..fa0600552ff7cc2479cd1c6722e6d1b4ef934f98 100644 --- a/src/childinfo.c +++ b/src/childinfo.c @@ -80,6 +80,8 @@ void receiveChildInfo(void) { server.stat_rdb_cow_bytes = server.child_info_data.cow_size; } else if (server.child_info_data.process_type == CHILD_INFO_TYPE_AOF) { server.stat_aof_cow_bytes = server.child_info_data.cow_size; + } else if (server.child_info_data.process_type == CHILD_INFO_TYPE_MODULE) { + server.stat_module_cow_bytes = server.child_info_data.cow_size; } } } diff --git a/src/db.c b/src/db.c index 51f5a12b4725eedefd06f814a959ce8426f80a19..4a489036a7b3655e7d8c8f756d5268ec56de3b4a 100644 --- a/src/db.c +++ b/src/db.c @@ -60,10 +60,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ - if (server.rdb_child_pid == -1 && - server.aof_child_pid == -1 && - !(flags & LOOKUP_NOTOUCH)) - { + if (!hasForkChild() && !(flags & LOOKUP_NOTOUCH)){ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); } else { diff --git a/src/defrag.c b/src/defrag.c index ecf0255dcb4e9dcec326805486dd426c79312d6f..93c6a46194aa6c9e1ac329f8c6c1a9219cb2bc99 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -1039,7 +1039,7 @@ void activeDefragCycle(void) { mstime_t latency; int quit = 0; - if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1) + if (hasForkChild()) return; /* Defragging memory while there's a fork will just do damage. */ /* Once a second, check if we the fragmentation justfies starting a scan diff --git a/src/module.c b/src/module.c index f4f753c007ec8059c62ce5d2a05d24a66448a62c..03cc367020904ec9e7e2593189dc9c49c52ef2b4 100644 --- a/src/module.c +++ b/src/module.c @@ -30,6 +30,7 @@ #include "server.h" #include "cluster.h" #include +#include #define REDISMODULE_CORE 1 #include "redismodule.h" @@ -291,6 +292,14 @@ typedef struct RedisModuleCommandFilter { /* Registered filters */ static list *moduleCommandFilters; +typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); + +static struct RedisModuleForkInfo { + RedisModuleForkDoneHandler done_handler; + void* done_handler_user_data; +} moduleForkInfo = {0}; + + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -5028,6 +5037,94 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } +/* -------------------------------------------------------------------------- + * Module fork API + * -------------------------------------------------------------------------- */ + +/* Create a background child process with the current frozen snaphost of the + * main process where you can do some processing in the background without + * affecting / freezing the traffic and no need for threads and GIL locking. + * Note that Redis allows for only one concurrent fork. + * When the child wants to exit, it should call RedisModule_ExitFromChild. + * If the parent wants to kill the child it should call RedisModule_KillForkChild + * The done handler callback will be executed on the parent process when the + * child existed (but not when killed) + * Return: -1 on failure, on success the parent process will get a positive PID + * of the child, and the child process will get 0. + */ +int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) +{ + pid_t childpid; + if (hasForkChild()) { + return -1; + } + + openChildInfoPipe(); + if ((childpid = redisFork()) == 0) { + /* Child */ + redisSetProcTitle("redis-module-fork"); + } else if (childpid == -1) { + closeChildInfoPipe(); + serverLog(LL_WARNING,"Can't fork for module: %s", strerror(errno)); + } else { + /* Parent */ + server.module_child_pid = childpid; + moduleForkInfo.done_handler = cb; + moduleForkInfo.done_handler_user_data = user_data; + serverLog(LL_NOTICE, "Module fork started pid: %d ", childpid); + } + return childpid; +} + +/* Call from the child process when you want to terminate it. + * retcode will be provided to the done handler executed on the parent process. + */ +int RM_ExitFromChild(int retcode) +{ + sendChildCOWInfo(CHILD_INFO_TYPE_MODULE, "Module fork"); + exitFromChild(retcode); + return REDISMODULE_OK; +} + +/* Can be used to kill the forked child process from the parent process. + * child_pid whould be the return value of RedisModule_Fork. */ +int RM_KillForkChild(int child_pid) +{ + int statloc; + /* No module child? return. */ + if (server.module_child_pid == -1) return REDISMODULE_ERR; + /* Make sure the module knows the pid it wants to kill (not trying to + * randomly kill other module's forks) */ + if (server.module_child_pid != child_pid) return REDISMODULE_ERR; + /* Kill module child, wait for child exit. */ + serverLog(LL_NOTICE,"Killing running module fork child: %ld", + (long) server.module_child_pid); + if (kill(server.module_child_pid,SIGUSR1) != -1) { + while(wait3(&statloc,0,NULL) != server.module_child_pid); + } + /* Reset the buffer accumulating changes while the child saves. */ + server.module_child_pid = -1; + moduleForkInfo.done_handler = NULL; + moduleForkInfo.done_handler_user_data = NULL; + closeChildInfoPipe(); + updateDictResizePolicy(); + return REDISMODULE_OK; +} + +void ModuleForkDoneHandler(int exitcode, int bysignal) +{ + serverLog(LL_NOTICE, + "Module fork exited pid: %d, retcode: %d, bysignal: %d", + server.module_child_pid, exitcode, bysignal); + if (moduleForkInfo.done_handler) { + moduleForkInfo.done_handler(exitcode, bysignal, + moduleForkInfo.done_handler_user_data); + } + server.module_child_pid = -1; + moduleForkInfo.done_handler = NULL; + moduleForkInfo.done_handler_user_data = NULL; +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -5490,4 +5587,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(CommandFilterArgInsert); REGISTER_API(CommandFilterArgReplace); REGISTER_API(CommandFilterArgDelete); + REGISTER_API(Fork); + REGISTER_API(ExitFromChild); + REGISTER_API(KillForkChild); } diff --git a/src/rdb.c b/src/rdb.c index c566378fbd19dcf78a185c5f26a3d5e66c3f0568..0c3a80d01884db1f1b4360f4730dd43ef23ae78f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1293,40 +1293,25 @@ werr: int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { pid_t childpid; - long long start; - if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; + if (hasForkChild()) return C_ERR; server.dirty_before_bgsave = server.dirty; server.lastbgsave_try = time(NULL); openChildInfoPipe(); - start = ustime(); - if ((childpid = fork()) == 0) { + if ((childpid = redisFork()) == 0) { int retval; /* Child */ - closeListeningSockets(0); redisSetProcTitle("redis-rdb-bgsave"); retval = rdbSave(filename,rsi); if (retval == C_OK) { - size_t private_dirty = zmalloc_get_private_dirty(-1); - - if (private_dirty) { - serverLog(LL_NOTICE, - "RDB: %zu MB of memory used by copy-on-write", - private_dirty/(1024*1024)); - } - - server.child_info_data.cow_size = private_dirty; - sendChildInfo(CHILD_INFO_TYPE_RDB); + sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB"); } exitFromChild((retval == C_OK) ? 0 : 1); } else { /* Parent */ - server.stat_fork_time = ustime()-start; - server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ - latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); if (childpid == -1) { closeChildInfoPipe(); server.lastbgsave_status = C_ERR; @@ -1338,7 +1323,6 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_DISK; - updateDictResizePolicy(); return C_OK; } return C_OK; /* unreached */ @@ -2279,10 +2263,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { listNode *ln; listIter li; pid_t childpid; - long long start; int pipefds[2]; - if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR; + if (hasForkChild()) return C_ERR; /* Before to fork, create a pipe that will be used in order to * send back to the parent the IDs of the slaves that successfully @@ -2318,8 +2301,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { /* Create the child process. */ openChildInfoPipe(); - start = ustime(); - if ((childpid = fork()) == 0) { + if ((childpid = redisFork()) == 0) { /* Child */ int retval; rio slave_sockets; @@ -2327,7 +2309,6 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { rioInitWithFdset(&slave_sockets,fds,numfds); zfree(fds); - closeListeningSockets(0); redisSetProcTitle("redis-rdb-to-slaves"); retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi); @@ -2335,16 +2316,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { retval = C_ERR; if (retval == C_OK) { - size_t private_dirty = zmalloc_get_private_dirty(-1); - - if (private_dirty) { - serverLog(LL_NOTICE, - "RDB: %zu MB of memory used by copy-on-write", - private_dirty/(1024*1024)); - } - - server.child_info_data.cow_size = private_dirty; - sendChildInfo(CHILD_INFO_TYPE_RDB); + sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB"); /* If we are returning OK, at least one slave was served * with the RDB file as expected, so we need to send a report @@ -2413,16 +2385,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { close(pipefds[1]); closeChildInfoPipe(); } else { - server.stat_fork_time = ustime()-start; - server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ - latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); - serverLog(LL_NOTICE,"Background RDB transfer started by pid %d", childpid); server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_SOCKET; - updateDictResizePolicy(); } zfree(clientids); zfree(fds); @@ -2465,13 +2432,13 @@ void bgsaveCommand(client *c) { if (server.rdb_child_pid != -1) { addReplyError(c,"Background save already in progress"); - } else if (server.aof_child_pid != -1) { + } else if (hasForkChild()) { if (schedule) { server.rdb_bgsave_scheduled = 1; addReplyStatus(c,"Background saving scheduled"); } else { addReplyError(c, - "An AOF log rewriting in progress: can't BGSAVE right now. " + "Another BG operation is in progress: can't BGSAVE right now. " "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever " "possible."); } diff --git a/src/redismodule.h b/src/redismodule.h index b9c73957bb228440c4f4563975b52413a8a5ccd3..60681da7c640ab33e2635e5a5dab387e123bfc2e 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -173,6 +173,7 @@ typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); +typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); #define REDISMODULE_TYPE_METHOD_VERSION 1 typedef struct RedisModuleTypeMethods { @@ -357,6 +358,9 @@ const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(R int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg); int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg); int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos); +int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data); +int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode); +int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid); #endif /* This is included inline inside each Redis module. */ @@ -528,6 +532,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(CommandFilterArgInsert); REDISMODULE_GET_API(CommandFilterArgReplace); REDISMODULE_GET_API(CommandFilterArgDelete); + REDISMODULE_GET_API(Fork); + REDISMODULE_GET_API(ExitFromChild); + REDISMODULE_GET_API(KillForkChild); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/src/replication.c b/src/replication.c index 26e7cf8f0825b9d1fdd56a7645c3f4ea1f1e3c91..7adf5ba38254244a7dee503134fe77e79604d33e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -751,11 +751,11 @@ void syncCommand(client *c) { /* Target is disk (or the slave is not capable of supporting * diskless replication) and we don't have a BGSAVE in progress, * let's start one. */ - if (server.aof_child_pid == -1) { + if (!hasForkChild()) { startBgsaveForReplication(c->slave_capa); } else { serverLog(LL_NOTICE, - "No BGSAVE in progress, but an AOF rewrite is active. " + "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed"); } } @@ -2899,7 +2899,7 @@ void replicationCron(void) { * In case of diskless replication, we make sure to wait the specified * number of seconds (according to configuration) so that other slaves * have the time to arrive before we start streaming. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { + if (!hasForkChild()) { time_t idle, max_idle = 0; int slaves_waiting = 0; int mincapa = -1; diff --git a/src/scripting.c b/src/scripting.c index 032bfdf107f66dfd66f1dc7a744ca6bb4e169a0b..deb4064571b924a0a1d0b51c08f2183c811f0d5f 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -1693,7 +1693,7 @@ void ldbSendLogs(void) { int ldbStartSession(client *c) { ldb.forked = (c->flags & CLIENT_LUA_DEBUG_SYNC) == 0; if (ldb.forked) { - pid_t cp = fork(); + pid_t cp = redisFork(); if (cp == -1) { addReplyError(c,"Fork() failed: can't run EVAL in debugging mode."); return 0; @@ -1710,7 +1710,6 @@ int ldbStartSession(client *c) { * socket to make sure if the parent crashes a reset is sent * to the clients. */ serverLog(LL_WARNING,"Redis forked for debugging eval"); - closeListeningSockets(0); } else { /* Parent */ listAddNodeTail(ldb.children,(void*)(unsigned long)cp); diff --git a/src/server.c b/src/server.c index 4337b8f016a5cca71f4ee533506285e40db70579..675b638d60833f565bc3e2d813dc131d3aacd5ee 100644 --- a/src/server.c +++ b/src/server.c @@ -1447,12 +1447,18 @@ int incrementallyRehash(int dbid) { * for dict.c to resize the hash tables accordingly to the fact we have o not * running childs. */ void updateDictResizePolicy(void) { - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) + if (!hasForkChild()) dictEnableResize(); else dictDisableResize(); } +int hasForkChild() { + return server.rdb_child_pid != -1 || + server.aof_child_pid != -1 || + server.module_child_pid != -1; +} + /* ======================= Cron: called every 100 ms ======================== */ /* Add a sample to the operations per second array of samples. */ @@ -1689,7 +1695,7 @@ void databasesCron(void) { /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { + if (!hasForkChild()) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ @@ -1886,15 +1892,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && + if (!hasForkChild() && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ - if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || - ldbPendingChildren()) + if (hasForkChild() || ldbPendingChildren()) { int statloc; pid_t pid; @@ -1907,16 +1912,20 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (pid == -1) { serverLog(LL_WARNING,"wait3() returned an error: %s. " - "rdb_child_pid = %d, aof_child_pid = %d", + "rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, - (int) server.aof_child_pid); + (int) server.aof_child_pid, + (int) server.module_child_pid); } else if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); + } else if (pid == server.module_child_pid) { + ModuleForkDoneHandler(exitcode,bysignal); + if (!bysignal && exitcode == 0) receiveChildInfo(); } else { if (!ldbRemoveChild(pid)) { serverLog(LL_WARNING, @@ -1954,8 +1963,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Trigger an AOF rewrite if needed. */ if (server.aof_state == AOF_ON && - server.rdb_child_pid == -1 && - server.aof_child_pid == -1 && + !hasForkChild() && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { @@ -2013,7 +2021,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * Note: this code must be after the replicationCron() call above so * make sure when refactoring this file to keep this order. This is useful * because we want to give priority to RDB savings for replication. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && + if (!hasForkChild() && server.rdb_bgsave_scheduled && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) @@ -2784,6 +2792,7 @@ void initServer(void) { server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; + server.module_child_pid = -1; server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_bgsave_scheduled = 0; server.child_info_pipe[0] = -1; @@ -2802,6 +2811,7 @@ void initServer(void) { server.stat_peak_memory = 0; server.stat_rdb_cow_bytes = 0; server.stat_aof_cow_bytes = 0; + server.stat_module_cow_bytes = 0; server.cron_malloc_stats.zmalloc_used = 0; server.cron_malloc_stats.process_rss = 0; server.cron_malloc_stats.allocator_allocated = 0; @@ -4040,7 +4050,9 @@ sds genRedisInfoString(char *section) { "aof_current_rewrite_time_sec:%jd\r\n" "aof_last_bgrewrite_status:%s\r\n" "aof_last_write_status:%s\r\n" - "aof_last_cow_size:%zu\r\n", + "aof_last_cow_size:%zu\r\n" + "module_fork_in_progress:%d\r\n" + "module_fork_last_cow_size:%zu\r\n", server.loading, server.dirty, server.rdb_child_pid != -1, @@ -4058,7 +4070,9 @@ sds genRedisInfoString(char *section) { -1 : time(NULL)-server.aof_rewrite_time_start), (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err", (server.aof_last_write_status == C_OK) ? "ok" : "err", - server.stat_aof_cow_bytes); + server.stat_aof_cow_bytes, + server.module_child_pid != -1, + server.stat_module_cow_bytes); if (server.aof_enabled) { info = sdscatprintf(info, @@ -4554,6 +4568,58 @@ void setupSignalHandlers(void) { return; } +static void sigKillChildHandler(int sig) { + UNUSED(sig); + /* this handler is needed to resolve a valgrind warning */ + serverLogFromHandler(LL_WARNING, "Received SIGUSR1 in child, exiting now."); + exitFromChild(1); +} + +void setupChildSignalHandlers(void) { + struct sigaction act; + + /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. + * Otherwise, sa_handler is used. */ + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + act.sa_handler = sigKillChildHandler; + sigaction(SIGUSR1, &act, NULL); + return; +} + +int redisFork() { + int childpid; + long long start = ustime(); + if ((childpid = fork()) == 0) { + /* Child */ + closeListeningSockets(0); + setupChildSignalHandlers(); + } else { + /* Parent */ + server.stat_fork_time = ustime()-start; + server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ + latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); + if (childpid == -1) { + return -1; + } + updateDictResizePolicy(); + } + return childpid; +} + +void sendChildCOWInfo(int ptype, char *pname) { + size_t private_dirty = zmalloc_get_private_dirty(-1); + + if (private_dirty) { + serverLog(LL_NOTICE, + "%s: %zu MB of memory used by copy-on-write", + pname, private_dirty/(1024*1024)); + } + + server.child_info_data.cow_size = private_dirty; + sendChildInfo(ptype); +} + void memtest(size_t megabytes, int passes); /* Returns 1 if there is --sentinel among the arguments or if diff --git a/src/server.h b/src/server.h index f81b1010ef8f6f6e300f31b680a08931b4d4f49b..deaaf2636010b958c7a513409f20aff65c36b7c5 100644 --- a/src/server.h +++ b/src/server.h @@ -1030,6 +1030,7 @@ struct clusterState; #define CHILD_INFO_MAGIC 0xC17DDA7A12345678LL #define CHILD_INFO_TYPE_RDB 0 #define CHILD_INFO_TYPE_AOF 1 +#define CHILD_INFO_TYPE_MODULE 3 struct redisServer { /* General */ @@ -1065,6 +1066,7 @@ struct redisServer { int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a client blocked on a module command needs to be processed. */ + pid_t module_child_pid; /* PID of module child */ /* Networking */ int port; /* TCP listening port */ int tcp_backlog; /* TCP listen() backlog */ @@ -1138,6 +1140,7 @@ struct redisServer { _Atomic long long stat_net_output_bytes; /* Bytes written to network. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ + size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { @@ -1528,6 +1531,7 @@ void moduleAcquireGIL(void); void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleCallCommandFilters(client *c); +void ModuleForkDoneHandler(int exitcode, int bysignal); /* Utils */ long long ustime(void); @@ -1786,6 +1790,11 @@ void closeChildInfoPipe(void); void sendChildInfo(int process_type); void receiveChildInfo(void); +/* Fork helpers */ +int redisFork(); +int hasForkChild(); +void sendChildCOWInfo(int ptype, char *pname); + /* acl.c -- Authentication related prototypes. */ extern rax *Users; extern user *DefaultUser; diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 014d20afaf3dd9e349f1f3eff96204134d72eaf3..846d4c87d2b063b0450c61b579c3da3c600312fd 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -13,12 +13,16 @@ endif .SUFFIXES: .c .so .xo .o -all: commandfilter.so +all: commandfilter.so fork.so .c.xo: $(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ commandfilter.xo: ../../src/redismodule.h +fork.xo: ../../src/redismodule.h commandfilter.so: commandfilter.xo $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + +fork.so: fork.xo + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc diff --git a/tests/modules/fork.c b/tests/modules/fork.c new file mode 100644 index 0000000000000000000000000000000000000000..0804e4355ab37bed77298f584ad6078541f44b76 --- /dev/null +++ b/tests/modules/fork.c @@ -0,0 +1,84 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "redismodule.h" + +#include +#include +#include + +#define UNUSED(V) ((void) V) + +int child_pid = -1; +int exitted_with_code = -1; + +void done_handler(int exitcode, int bysignal, void *user_data) { + child_pid = -1; + exitted_with_code = exitcode; + assert(user_data==(void*)0xdeadbeef); + UNUSED(bysignal); +} + +int fork_create(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + long long code_to_exit_with; + if (argc != 2) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + RedisModule_StringToLongLong(argv[1], &code_to_exit_with); + exitted_with_code = -1; + child_pid = RedisModule_Fork(done_handler, (void*)0xdeadbeef); + if (child_pid < 0) { + RedisModule_ReplyWithError(ctx, "Fork failed"); + return REDISMODULE_OK; + } else if (child_pid > 0) { + /* parent */ + RedisModule_ReplyWithLongLong(ctx, child_pid); + return REDISMODULE_OK; + } + + /* child */ + RedisModule_Log(ctx, "notice", "fork child started"); + usleep(200000); + RedisModule_Log(ctx, "notice", "fork child exiting"); + RedisModule_ExitFromChild(code_to_exit_with); + /* unreachable */ + return 0; +} + +int fork_exitcode(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + UNUSED(argv); + UNUSED(argc); + RedisModule_ReplyWithLongLong(ctx, exitted_with_code); + return REDISMODULE_OK; +} + +int fork_kill(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + UNUSED(argv); + UNUSED(argc); + if (RedisModule_KillForkChild(child_pid) != REDISMODULE_OK) + RedisModule_ReplyWithError(ctx, "KillForkChild failed"); + else + RedisModule_ReplyWithLongLong(ctx, 1); + child_pid = -1; + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + if (RedisModule_Init(ctx,"fork",1,REDISMODULE_APIVER_1)== REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fork.create", fork_create,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fork.exitcode", fork_exitcode,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fork.kill", fork_kill,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/fork.tcl b/tests/unit/moduleapi/fork.tcl new file mode 100644 index 0000000000000000000000000000000000000000..f7d7e47d56a5d38310498703197fa21e3f2888de --- /dev/null +++ b/tests/unit/moduleapi/fork.tcl @@ -0,0 +1,32 @@ +set testmodule [file normalize tests/modules/fork.so] + +proc count_log_message {pattern} { + set result [exec grep -c $pattern < [srv 0 stdout]] +} + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module fork} { + # the argument to fork.create is the exitcode on termination + r fork.create 3 + wait_for_condition 20 100 { + [r fork.exitcode] != -1 + } else { + fail "fork didn't terminate" + } + r fork.exitcode + } {3} + + test {Module fork kill} { + r fork.create 3 + after 20 + r fork.kill + after 100 + + assert {[count_log_message "fork child started"] eq "2"} + assert {[count_log_message "Received SIGUSR1 in child"] eq "1"} + assert {[count_log_message "fork child exiting"] eq "1"} + } + +}