Wed Mar 9 07:10:07 PST 2005
- Previous message: [Slony1-general] error
- Next message: [Slony1-general] signal handling watchdog using forked processes
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Hi All,
I'm one of the new additions to the slony development process and the
first change I've been asked to make is summarized below:
I have attached a diff against the slon src tree as well as a tarball of
the complete set of changes.
Slony-I 1.1 Change Request:
The problem at hand is related to signal handling differences in
multithreaded programs between different operating systems. In the slon
process, it is intended that only the "main" thread is dealing with
signals and that it uses the regular thread communication like mutexes
and condition variables to control all the other working threads.
That mutex and condition var communication sometimes can lock up, which
causes the entire slon daemon to freeze. This can in extreme cases even
happen after one of the working threads sent the main thread a signal to
restart the slon process.
What I had in mind to fix this is to have the program to actually fork()
very early on. The parent process will then be the only one dealing with
signals, while the childs ignores all signals entirely. The parent and
the master thread of the child (which is the current slon) will
communicate over a pipe or a socketpair. This way, the parent could
detect that the slon stopped responding altogether and can issue a
signal 9 to cleanup the situation.
In other words, building a watchdog process right into the slon executable.
Summary of Changes:
slon main() will now fork() at the start and allow the parent process
to handle cleanup, signal and termination of the child process. The
child process is the scheduler which will talk to the parent (watchdog)
process and vice versa via two sets of socket pairs. When shutting down
the child process in the case of restart or termination, non blocking reads
and writes are used to avoid lockups. Failure to shutdown the scheduler
process nicely, the parent will SIGKILL it to ensure timely operation within
the signal handler.
globals:
slon.c
------
int watchdog_pipe[2]; // socket pair to talk with child process
int sched_wakeuppipe[2]; // socket pair to talk with scheduler and parent process
runtime_config.c
----------------
pid_t slon_pid; // current pid
pid_t slon_ppid; // parent pid
pid_t slon_cpid; // child pid
macros:
slon_abort() and slon_restart() will now observe which process
in the tree they will act upon, parent or child
#define slon_abort() \
do { \
kill((slon_ppid == 0 ? slon_pid : slon_ppid), SIGTERM); \
pthread_exit(NULL); \
} while (0)
#define slon_restart() \
do { \
kill((slon_ppid == 0 ? slon_pid : slon_ppid), SIGHUP); \
} while (0)
functions:
slon.c
------
static void sighandler(int signo); // new slon parent process signal handler
static void main_sigalrmhandler(int signo); // new scheduler alarm signal handler
static void slon_kill_child(void); // new pipe notify based child termination function
-------------- next part --------------
A non-text attachment was scrubbed...
Name: slon.tgz
Type: application/x-gzip
Size: 73458 bytes
Desc:
Url : http://gborg.postgresql.org/pipermail/slony1-general/attachments/20050308/3d9e2a13/slon-0001.bin
-------------- next part --------------
Index: cleanup_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.19
diff -c -r1.19 cleanup_thread.c
*** cleanup_thread.c 12 Jan 2005 17:27:09 -0000 1.19
--- cleanup_thread.c 7 Mar 2005 20:01:54 -0000
***************
*** 65,70 ****
--- 65,71 ----
{
kill(getpid(), SIGTERM);
pthread_exit(NULL);
+ /* slon_abort(); */
}
dbconn = conn->dbconn;
Index: local_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.29
diff -c -r1.29 local_listen.c
*** local_listen.c 12 Jan 2005 17:27:10 -0000 1.29
--- local_listen.c 7 Mar 2005 20:01:55 -0000
***************
*** 160,166 ****
slon_log(SLON_INFO,
"localListenThread: got restart notification - "
"signal scheduler\n");
! slon_restart();
}
/*
--- 160,166 ----
slon_log(SLON_INFO,
"localListenThread: got restart notification - "
"signal scheduler\n");
! kill(getppid(), SIGHUP);
}
/*
Index: runtime_config.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/runtime_config.c,v
retrieving revision 1.23
diff -c -r1.23 runtime_config.c
*** runtime_config.c 12 Jan 2005 17:27:11 -0000 1.23
--- runtime_config.c 7 Mar 2005 20:01:55 -0000
***************
*** 33,38 ****
--- 33,40 ----
* ---------- Global data ----------
*/
pid_t slon_pid;
+ pid_t slon_cpid;
+ pid_t slon_ppid;
char *rtcfg_cluster_name = NULL;
char *rtcfg_namespace = NULL;
char *rtcfg_conninfo = NULL;
Index: scheduler.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/scheduler.c,v
retrieving revision 1.17
diff -c -r1.17 scheduler.c
*** scheduler.c 12 Jan 2005 17:27:11 -0000 1.17
--- scheduler.c 7 Mar 2005 20:01:55 -0000
***************
*** 15,24 ****
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
- #include <signal.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
--- 15,24 ----
#include <stdio.h>
#include <stdlib.h>
+ #include <signal.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
***************
*** 36,42 ****
#define PF_LOCAL PF_UNIX
#endif
-
/*
* ---------- Static data ----------
*/
--- 36,41 ----
***************
*** 45,51 ****
static int sched_numfd = 0;
static fd_set sched_fdset_read;
static fd_set sched_fdset_write;
- static int sched_wakeuppipe[2];
static SlonConn *sched_waitqueue_head = NULL;
static SlonConn *sched_waitqueue_tail = NULL;
--- 44,49 ----
***************
*** 55,71 ****
static pthread_mutex_t sched_master_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t sched_master_cond = PTHREAD_COND_INITIALIZER;
- static sigset_t sched_sigset;
-
/*
* ---------- Local functions ----------
*/
static void *sched_mainloop(void *);
- static void sched_sighandler(int signo);
- static void sched_sighuphandler(int signo);
static void sched_add_fdset(int fd, fd_set * fds);
static void sched_remove_fdset(int fd, fd_set * fds);
/*
--- 53,66 ----
static pthread_mutex_t sched_master_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t sched_master_cond = PTHREAD_COND_INITIALIZER;
/*
* ---------- Local functions ----------
*/
static void *sched_mainloop(void *);
static void sched_add_fdset(int fd, fd_set * fds);
static void sched_remove_fdset(int fd, fd_set * fds);
+ static void sched_shutdown();
/*
***************
*** 85,100 ****
sched_main_thread = pthread_self();
/*
- * Block signals. Since sched_start_mainloop() is called before any other
- * thread is created, this will be inherited by all threads in the system.
- */
- sigemptyset(&sched_sigset);
- sigaddset(&sched_sigset, SIGHUP);
- sigaddset(&sched_sigset, SIGINT);
- sigaddset(&sched_sigset, SIGTERM);
- pthread_sigmask(SIG_BLOCK, &sched_sigset, NULL);
-
- /*
* Grab the scheduler master lock
*/
if (pthread_mutex_lock(&sched_master_lock) < 0)
--- 80,85 ----
***************
*** 153,184 ****
int
sched_wait_mainloop(void)
{
- int signo;
-
- /*
- * Wait for signal.
- */
- sigemptyset(&sched_sigset);
- sigaddset(&sched_sigset, SIGHUP);
- sigaddset(&sched_sigset, SIGINT);
- sigaddset(&sched_sigset, SIGTERM);
- sigwait(&sched_sigset, &signo);
-
- sigemptyset(&sched_sigset);
- pthread_sigmask(SIG_SETMASK, &sched_sigset, NULL);
-
- switch (signo)
- {
- case SIGHUP:
- sched_sighuphandler(signo);
- break;
-
- case SIGINT:
- case SIGTERM:
- sched_sighandler(signo);
- break;
- }
-
/*
* Wait for the scheduler to finish.
*/
--- 138,143 ----
***************
*** 413,429 ****
FD_ZERO(&sched_fdset_read);
FD_ZERO(&sched_fdset_write);
- /*
- * Create a pipe used by the main thread to cleanly wakeup the scheduler
- * on signals.
- */
- if (pipe(sched_wakeuppipe) < 0)
- {
- perror("sched_mainloop: pipe()");
- sched_status = SCHED_STATUS_ERROR;
- pthread_cond_signal(&sched_master_cond);
- pthread_exit(NULL);
- }
sched_add_fdset(sched_wakeuppipe[0], &sched_fdset_read);
/*
--- 372,377 ----
***************
*** 575,580 ****
--- 523,533 ----
sched_status = SCHED_STATUS_ERROR;
break;
}
+
+ if (buf[0] == 'p')
+ {
+ sched_status = SCHED_STATUS_SHUTDOWN;
+ }
}
/*
***************
*** 644,652 ****
* close the scheduler heads-up socket pair so nobody will think we're
* listening any longer.
*/
close(sched_wakeuppipe[0]);
close(sched_wakeuppipe[1]);
! sched_wakeuppipe[0] = sched_wakeuppipe[1] = -1;
/*
* Then we cond_signal all connections that are in the queue.
--- 597,609 ----
* close the scheduler heads-up socket pair so nobody will think we're
* listening any longer.
*/
+
+ /*
close(sched_wakeuppipe[0]);
+ sched_wakeuppipe[0] = -1;
close(sched_wakeuppipe[1]);
! sched_wakeuppipe[1] = -1;
! */
/*
* Then we cond_signal all connections that are in the queue.
***************
*** 692,698 ****
* conditions with signals. ----------
*/
static void
! sched_sighandler(int signo)
{
/*
* Lock the master mutex and make sure that we are the main thread
--- 649,655 ----
* conditions with signals. ----------
*/
static void
! sched_shutdown()
{
/*
* Lock the master mutex and make sure that we are the main thread
***************
*** 733,745 ****
}
- static void
- sched_sighuphandler(int signo)
- {
- slon_restart_request = true;
- sched_sighandler(signo);
- }
-
/*
* ---------- sched_add_fdset
--- 690,695 ----
Index: slon.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.c,v
retrieving revision 1.45
diff -c -r1.45 slon.c
*** slon.c 18 Feb 2005 00:15:57 -0000 1.45
--- slon.c 7 Mar 2005 20:01:56 -0000
***************
*** 32,43 ****
/*
* ---------- Global data ----------
*/
! int slon_restart_request = false;
pthread_mutex_t slon_wait_listen_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t slon_wait_listen_cond = PTHREAD_COND_INITIALIZER;
-
/*
* ---------- Local data ----------
*/
--- 32,43 ----
/*
* ---------- Global data ----------
*/
! int watchdog_pipe[2];
! int sched_wakeuppipe[2];
pthread_mutex_t slon_wait_listen_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t slon_wait_listen_cond = PTHREAD_COND_INITIALIZER;
/*
* ---------- Local data ----------
*/
***************
*** 51,61 ****
static pthread_t main_thread;
static char *const *main_argv;
! static void sigalrmhandler(int signo);
int slon_log_level;
char *pid_file;
char *archive_dir = NULL;
/*
* ---------- main ----------
--- 51,66 ----
static pthread_t main_thread;
static char *const *main_argv;
!
! static void sighandler(int signo);
! static void main_sigalrmhandler(int signo);
! static void slon_kill_child(void);
int slon_log_level;
char *pid_file;
char *archive_dir = NULL;
+ int child_status;
+
/*
* ---------- main ----------
***************
*** 72,84 ****
PGconn *startup_conn;
int c;
int errors = 0;
extern int optind;
extern char *optarg;
!
InitializeConfOptions();
-
while ((c = getopt(argc, argv, "f:a:d:s:t:g:c:p:o:hv")) != EOF)
{
switch (c)
--- 77,91 ----
PGconn *startup_conn;
int c;
int errors = 0;
+ int signo;
+ char pipe_c;
+ pid_t pid;
extern int optind;
extern char *optarg;
! struct sigaction act;
InitializeConfOptions();
while ((c = getopt(argc, argv, "f:a:d:s:t:g:c:p:o:hv")) != EOF)
{
switch (c)
***************
*** 146,151 ****
--- 153,161 ----
* identifier
*/
slon_pid = getpid();
+ slon_cpid = 0;
+ slon_ppid = 0;
+ main_argv = argv;
if ((char *)argv[optind])
{
***************
*** 214,220 ****
}
if (PQstatus(startup_conn) != CONNECTION_OK)
{
! slon_log(SLON_FATAL, "main: Cannot connect to local database - %s",
PQerrorMessage(startup_conn));
PQfinish(startup_conn);
slon_exit(-1);
--- 224,230 ----
}
if (PQstatus(startup_conn) != CONNECTION_OK)
{
! slon_log(SLON_FATAL, "main: Cannot connect to local database - %s\n",
PQerrorMessage(startup_conn));
PQfinish(startup_conn);
slon_exit(-1);
***************
*** 252,612 ****
}
}
-
/*
! * Start the event scheduling system
*/
! slon_log(SLON_CONFIG, "main: launching sched_start_mainloop\n");
! if (sched_start_mainloop() < 0)
slon_exit(-1);
!
! slon_log(SLON_CONFIG, "main: loading current cluster configuration\n");
!
! /*
! * Begin a transaction
! */
! res = PQexec(startup_conn,
! "start transaction; "
! "set transaction isolation level serializable;");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
! slon_log(SLON_FATAL, "Cannot start transaction - %s",
! PQresultErrorMessage(res));
! PQclear(res);
slon_exit(-1);
}
- PQclear(res);
/*
! * Read configuration table sl_node
*/
! dstring_init(&query);
! slon_mkquery(&query,
! "select no_id, no_active, no_comment, "
! " (select coalesce(max(con_seqno),0) from %s.sl_confirm "
! " where con_origin = no_id and con_received = %d) "
! " as last_event "
! "from %s.sl_node "
! "order by no_id; ",
! rtcfg_namespace, rtcfg_nodeid, rtcfg_namespace);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
! slon_log(SLON_FATAL, "main: Cannot get node list - %s",
! PQresultErrorMessage(res));
! PQclear(res);
! dstring_free(&query);
slon_exit(-1);
}
! for (i = 0, n = PQntuples(res); i < n; i++)
{
! int no_id = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
! int no_active = (*PQgetvalue(res, i, 1) == 't') ? 1 : 0;
! char *no_comment = PQgetvalue(res, i, 2);
! int64 last_event;
!
! if (no_id == rtcfg_nodeid)
! {
! /*
! * Complete our own local node entry
! */
! rtcfg_nodeactive = no_active;
! rtcfg_nodecomment = strdup(no_comment);
}
! else
{
! /*
! * Add a remote node
! */
! slon_scanint64(PQgetvalue(res, i, 3), &last_event);
! rtcfg_storeNode(no_id, no_comment);
! rtcfg_setNodeLastEvent(no_id, last_event);
!
! /*
! * If it is active, remember for activation just before we start
! * processing events.
! */
! if (no_active)
! rtcfg_needActivate(no_id);
}
- }
- PQclear(res);
! /*
! * Read configuration table sl_path - the interesting pieces
! */
! slon_mkquery(&query,
! "select pa_server, pa_conninfo, pa_connretry "
! "from %s.sl_path where pa_client = %d",
! rtcfg_namespace, rtcfg_nodeid);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
! {
! slon_log(SLON_FATAL, "main: Cannot get path config - %s",
! PQresultErrorMessage(res));
PQclear(res);
- dstring_free(&query);
- slon_exit(-1);
- }
- for (i = 0, n = PQntuples(res); i < n; i++)
- {
- int pa_server = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
- char *pa_conninfo = PQgetvalue(res, i, 1);
- int pa_connretry = (int)strtol(PQgetvalue(res, i, 2), NULL, 10);
! rtcfg_storePath(pa_server, pa_conninfo, pa_connretry);
! }
! PQclear(res);
! /*
! * Load the initial listen configuration
! */
! rtcfg_reloadListen(startup_conn);
! /*
! * Read configuration table sl_set
! */
! slon_mkquery(&query,
! "select set_id, set_origin, set_comment "
! "from %s.sl_set",
! rtcfg_namespace);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
! {
! slon_log(SLON_FATAL, "main: Cannot get set config - %s",
! PQresultErrorMessage(res));
PQclear(res);
- dstring_free(&query);
- slon_exit(-1);
- }
- for (i = 0, n = PQntuples(res); i < n; i++)
- {
- int set_id = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
- int set_origin = (int)strtol(PQgetvalue(res, i, 1), NULL, 10);
- char *set_comment = PQgetvalue(res, i, 2);
! rtcfg_storeSet(set_id, set_origin, set_comment);
! }
! PQclear(res);
! /*
! * Read configuration table sl_subscribe - only subscriptions for local node
! */
! slon_mkquery(&query,
! "select sub_set, sub_provider, sub_forward, sub_active "
! "from %s.sl_subscribe "
! "where sub_receiver = %d",
! rtcfg_namespace, rtcfg_nodeid);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
! {
! slon_log(SLON_FATAL, "main: Cannot get subscription config - %s",
! PQresultErrorMessage(res));
PQclear(res);
dstring_free(&query);
! slon_exit(-1);
}
! for (i = 0, n = PQntuples(res); i < n; i++)
{
! int sub_set = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
! int sub_provider = (int)strtol(PQgetvalue(res, i, 1), NULL, 10);
! char *sub_forward = PQgetvalue(res, i, 2);
! char *sub_active = PQgetvalue(res, i, 3);
!
! rtcfg_storeSubscribe(sub_set, sub_provider, sub_forward);
! if (*sub_active == 't')
! rtcfg_enableSubscription(sub_set, sub_provider, sub_forward);
}
! PQclear(res);
! /*
! * Remember the last known local event sequence
! */
! slon_mkquery(&query,
! "select coalesce(max(ev_seqno), -1) from %s.sl_event "
! "where ev_origin = '%d'",
! rtcfg_namespace, rtcfg_nodeid);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
! slon_log(SLON_FATAL, "main: Cannot get last local eventid - %s",
! PQresultErrorMessage(res));
! PQclear(res);
! dstring_free(&query);
! slon_exit(-1);
}
- if (PQntuples(res) == 0)
- strcpy(rtcfg_lastevent, "-1");
- else if (PQgetisnull(res, 0, 0))
- strcpy(rtcfg_lastevent, "-1");
else
! strcpy(rtcfg_lastevent, PQgetvalue(res, 0, 0));
! PQclear(res);
! dstring_free(&query);
! slon_log(SLON_DEBUG2,
! "main: last local event sequence = %s\n",
! rtcfg_lastevent);
! /*
! * Rollback the transaction we used to get the config snapshot
! */
! res = PQexec(startup_conn, "rollback transaction;");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
! slon_log(SLON_FATAL, "main: Cannot rollback transaction - %s",
! PQresultErrorMessage(res));
! PQclear(res);
slon_exit(-1);
}
! PQclear(res);
! /*
! * Done with the startup, don't need the local connection any more.
! */
! PQfinish(startup_conn);
! slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n");
! /*
! * Create the local event thread that monitors the local node
! * for administrative events to adjust the configuration at
! * runtime. We wait here until the local listen thread has
! * checked that there is no other slon daemon running.
! */
! pthread_mutex_lock(&slon_wait_listen_lock);
! if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0)
! {
! slon_log(SLON_FATAL, "main: cannot create localListenThread - %s\n",
! strerror(errno));
! slon_abort();
! }
! pthread_cond_wait(&slon_wait_listen_cond, &slon_wait_listen_lock);
! pthread_mutex_unlock(&slon_wait_listen_lock);
! /*
! * Enable all nodes that are active
! */
! rtcfg_doActivate();
! /*
! * Create the local cleanup thread that will remove old events and log
! * data.
! */
! if (pthread_create(&local_cleanup_thread, NULL, cleanupThread_main, NULL) < 0)
! {
! slon_log(SLON_FATAL, "main: cannot create cleanupThread - %s\n",
! strerror(errno));
! slon_abort();
! }
! /*
! * Create the local sync thread that will generate SYNC events if we had
! * local database updates.
! */
! if (pthread_create(&local_sync_thread, NULL, syncThread_main, NULL) < 0)
! {
! slon_log(SLON_FATAL, "main: cannot create syncThread - %s\n",
! strerror(errno));
! slon_abort();
! }
! #ifdef HAVE_NETSNMP
! if (pthread_create(&local_snmp_thread, NULL, snmpThread_main, NULL) < 0)
{
! slon_log(SLON_FATAL, "main: cannot create snmpThread -%s\n",
! strerror(errno));
! slon_abort();
}
! #endif
! /*
! * Wait until the scheduler has shut down all remote connections
! */
! slon_log(SLON_DEBUG1, "main: running scheduler mainloop\n");
! if (sched_wait_mainloop() < 0)
{
! slon_log(SLON_FATAL, "main: scheduler returned with error\n");
! slon_abort();
}
- slon_log(SLON_DEBUG1, "main: scheduler mainloop returned\n");
! /*
! * Wait for all remote threads to finish
! */
! main_thread = pthread_self();
! main_argv = argv;
! signal(SIGALRM, sigalrmhandler);
! alarm(20);
! rtcfg_joinAllRemoteThreads();
! alarm(0);
! /*
! * Wait for the local threads to finish
! */
! if (pthread_join(local_event_thread, NULL) < 0)
! slon_log(SLON_ERROR, "main: cannot join localListenThread - %s\n",
! strerror(errno));
!
! if (pthread_join(local_cleanup_thread, NULL) < 0)
! slon_log(SLON_ERROR, "main: cannot join cleanupThread - %s\n",
! strerror(errno));
!
! if (pthread_join(local_sync_thread, NULL) < 0)
! slon_log(SLON_ERROR, "main: cannot join syncThread - %s\n",
! strerror(errno));
! #ifdef HAVE_NETSNMP
! if (pthread_kill(local_snmp_thread, SIGINT) < 0)
! slon_log(SLON_ERROR, "main: cannot join snmpThread - %s\n",
! strerror(errno));
! #endif
! if (slon_restart_request)
{
! slon_log(SLON_DEBUG1, "main: restart requested\n");
! execvp(argv[0], argv);
! slon_log(SLON_FATAL,
! "main: cannot restart via execvp(): %s\n", strerror(errno));
! exit(-1);
}
! /*
! * That's it.
! */
! slon_log(SLON_DEBUG1, "main: done\n");
! return 0;
}
-
void
slon_exit(int code)
{
! if (pid_file)
{
unlink(pid_file);
}
- exit(code);
- }
-
! static void
! sigalrmhandler(int signo)
! {
! if (main_thread == pthread_self())
! {
! alarm(0);
! slon_log(SLON_WARN, "main: shutdown timeout\n");
! if (slon_restart_request)
! {
! execvp(main_argv[0], main_argv);
! slon_log(SLON_FATAL,
! "main: cannot restart via execvp(): %s\n", strerror(errno));
! }
! exit(-1);
! }
! pthread_kill(main_thread, SIGALRM);
}
/*
* Local Variables:
* tab-width: 4
--- 262,866 ----
}
}
/*
! * Pipes to be used as communication devices between the parent (watchdog)
! * and child (worker) processes.
*/
! if (pipe(watchdog_pipe) < 0)
! {
! slon_log(SLON_FATAL, "slon: parent pipe create failed -(%d) %s\n", errno,strerror(errno));
slon_exit(-1);
! }
! if (pipe(sched_wakeuppipe) < 0)
{
! slon_log(SLON_FATAL, "slon: sched_wakeuppipe create failed -(%d) %s\n", errno,strerror(errno));
slon_exit(-1);
}
/*
! * Fork here to allow parent process to trap signals and child process to
! * handle real processing work creating a watchdog and worker process
! * hierarchy
*/
! if ((slon_cpid = fork()) < 0)
{
! slon_log(SLON_FATAL, "Fork failed -(%d) %s\n", errno,strerror(errno));
slon_exit(-1);
}
! else if (slon_cpid == 0) /* child */
{
! slon_pid = getpid();
! slon_ppid = getppid();
!
! slon_log(SLON_DEBUG2, "main: main process started\n");
! /*
! * Wait for the parent process to initialize
! */
! if (read(watchdog_pipe[0], &pipe_c, 1) != 1)
! {
! slon_log(SLON_FATAL, "main: read from parent pipe failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
}
!
! if (pipe_c != 'p')
{
! slon_log(SLON_FATAL, "main: incorrect data from parent pipe -(%c)\n",pipe_c);
! slon_exit(-1);
}
! slon_log(SLON_DEBUG2, "main: begin signal handler setup\n");
!
! if (signal(SIGHUP,SIG_IGN) == SIG_ERR)
! {
! slon_log(SLON_FATAL, "slon: SIGHUP signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
! if (signal(SIGINT,SIG_IGN) == SIG_ERR)
! {
! slon_log(SLON_FATAL, "slon: SIGINT signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
! if (signal(SIGTERM,SIG_IGN) == SIG_ERR)
! {
! slon_log(SLON_FATAL, "slon: SIGTERM signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
! if (signal(SIGCHLD,SIG_IGN) == SIG_ERR)
! {
! slon_log(SLON_FATAL, "slon: SIGCHLD signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
! if (signal(SIGQUIT,SIG_IGN) == SIG_ERR)
! {
! slon_log(SLON_FATAL, "slon: SIGQUIT signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
!
! slon_log(SLON_DEBUG2, "main: end signal handler setup\n");
!
! /*
! * Start the event scheduling system
! */
! slon_log(SLON_CONFIG, "main: launching sched_start_mainloop\n");
! if (sched_start_mainloop() < 0)
! slon_exit(-1);
!
! slon_log(SLON_CONFIG, "main: loading current cluster configuration\n");
!
! /*
! * Begin a transaction
! */
! res = PQexec(startup_conn,
! "start transaction; "
! "set transaction isolation level serializable;");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! {
! slon_log(SLON_FATAL, "Cannot start transaction - %s\n",
! PQresultErrorMessage(res));
! PQclear(res);
! slon_exit(-1);
! }
PQclear(res);
! /*
! * Read configuration table sl_node
! */
! dstring_init(&query);
! slon_mkquery(&query,
! "select no_id, no_active, no_comment, "
! " (select coalesce(max(con_seqno),0) from %s.sl_confirm "
! " where con_origin = no_id and con_received = %d) "
! " as last_event "
! "from %s.sl_node "
! "order by no_id; ",
! rtcfg_namespace, rtcfg_nodeid, rtcfg_namespace);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
! {
! slon_log(SLON_FATAL, "main: Cannot get node list - %s\n",
! PQresultErrorMessage(res));
! PQclear(res);
! dstring_free(&query);
! slon_exit(-1);
! }
! for (i = 0, n = PQntuples(res); i < n; i++)
! {
! int no_id = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
! int no_active = (*PQgetvalue(res, i, 1) == 't') ? 1 : 0;
! char *no_comment = PQgetvalue(res, i, 2);
! int64 last_event;
!
! if (no_id == rtcfg_nodeid)
! {
! /*
! * Complete our own local node entry
! */
! rtcfg_nodeactive = no_active;
! rtcfg_nodecomment = strdup(no_comment);
! }
! else
! {
! /*
! * Add a remote node
! */
! slon_scanint64(PQgetvalue(res, i, 3), &last_event);
! rtcfg_storeNode(no_id, no_comment);
! rtcfg_setNodeLastEvent(no_id, last_event);
!
! /*
! * If it is active, remember for activation just before we start
! * processing events.
! */
! if (no_active)
! rtcfg_needActivate(no_id);
! }
! }
! PQclear(res);
! /*
! * Read configuration table sl_path - the interesting pieces
! */
! slon_mkquery(&query,
! "select pa_server, pa_conninfo, pa_connretry "
! "from %s.sl_path where pa_client = %d",
! rtcfg_namespace, rtcfg_nodeid);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
! {
! slon_log(SLON_FATAL, "main: Cannot get path config - %s\n",
! PQresultErrorMessage(res));
! PQclear(res);
! dstring_free(&query);
! slon_exit(-1);
! }
! for (i = 0, n = PQntuples(res); i < n; i++)
! {
! int pa_server = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
! char *pa_conninfo = PQgetvalue(res, i, 1);
! int pa_connretry = (int)strtol(PQgetvalue(res, i, 2), NULL, 10);
! rtcfg_storePath(pa_server, pa_conninfo, pa_connretry);
! }
PQclear(res);
! /*
! * Load the initial listen configuration
! */
! rtcfg_reloadListen(startup_conn);
!
! /*
! * Read configuration table sl_set
! */
! slon_mkquery(&query,
! "select set_id, set_origin, set_comment "
! "from %s.sl_set",
! rtcfg_namespace);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
! {
! slon_log(SLON_FATAL, "main: Cannot get set config - %s\n",
! PQresultErrorMessage(res));
! PQclear(res);
! dstring_free(&query);
! slon_exit(-1);
! }
! for (i = 0, n = PQntuples(res); i < n; i++)
! {
! int set_id = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
! int set_origin = (int)strtol(PQgetvalue(res, i, 1), NULL, 10);
! char *set_comment = PQgetvalue(res, i, 2);
! rtcfg_storeSet(set_id, set_origin, set_comment);
! }
! PQclear(res);
!
! /*
! * Read configuration table sl_subscribe - only subscriptions for local node
! */
! slon_mkquery(&query,
! "select sub_set, sub_provider, sub_forward, sub_active "
! "from %s.sl_subscribe "
! "where sub_receiver = %d",
! rtcfg_namespace, rtcfg_nodeid);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
! {
! slon_log(SLON_FATAL, "main: Cannot get subscription config - %s\n",
! PQresultErrorMessage(res));
! PQclear(res);
! dstring_free(&query);
! slon_exit(-1);
! }
! for (i = 0, n = PQntuples(res); i < n; i++)
! {
! int sub_set = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
! int sub_provider = (int)strtol(PQgetvalue(res, i, 1), NULL, 10);
! char *sub_forward = PQgetvalue(res, i, 2);
! char *sub_active = PQgetvalue(res, i, 3);
!
! rtcfg_storeSubscribe(sub_set, sub_provider, sub_forward);
! if (*sub_active == 't')
! rtcfg_enableSubscription(sub_set, sub_provider, sub_forward);
! }
! PQclear(res);
!
! /*
! * Remember the last known local event sequence
! */
! slon_mkquery(&query,
! "select coalesce(max(ev_seqno), -1) from %s.sl_event "
! "where ev_origin = '%d'",
! rtcfg_namespace, rtcfg_nodeid);
! res = PQexec(startup_conn, dstring_data(&query));
! if (PQresultStatus(res) != PGRES_TUPLES_OK)
! {
! slon_log(SLON_FATAL, "main: Cannot get last local eventid - %s\n",
! PQresultErrorMessage(res));
! PQclear(res);
! dstring_free(&query);
! slon_exit(-1);
! }
! if (PQntuples(res) == 0)
! strcpy(rtcfg_lastevent, "-1");
! else if (PQgetisnull(res, 0, 0))
! strcpy(rtcfg_lastevent, "-1");
! else
! strcpy(rtcfg_lastevent, PQgetvalue(res, 0, 0));
PQclear(res);
dstring_free(&query);
! slon_log(SLON_DEBUG2,
! "main: last local event sequence = %s\n",
! rtcfg_lastevent);
!
! /*
! * Rollback the transaction we used to get the config snapshot
! */
! res = PQexec(startup_conn, "rollback transaction;");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! {
! slon_log(SLON_FATAL, "main: Cannot rollback transaction - %s\n",
! PQresultErrorMessage(res));
! PQclear(res);
! slon_exit(-1);
! }
! PQclear(res);
!
! /*
! * Done with the startup, don't need the local connection any more.
! */
! PQfinish(startup_conn);
!
! slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n");
!
! /*
! * Create the local event thread that monitors the local node
! * for administrative events to adjust the configuration at
! * runtime. We wait here until the local listen thread has
! * checked that there is no other slon daemon running.
! */
! pthread_mutex_lock(&slon_wait_listen_lock);
! if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0)
! {
! slon_log(SLON_FATAL, "main: cannot create localListenThread - %s\n",
! strerror(errno));
! slon_abort();
! }
! pthread_cond_wait(&slon_wait_listen_cond, &slon_wait_listen_lock);
! pthread_mutex_unlock(&slon_wait_listen_lock);
!
! /*
! * Enable all nodes that are active
! */
! rtcfg_doActivate();
!
! /*
! * Create the local cleanup thread that will remove old events and log
! * data.
! */
! if (pthread_create(&local_cleanup_thread, NULL, cleanupThread_main, NULL) < 0)
! {
! slon_log(SLON_FATAL, "main: cannot create cleanupThread - %s\n",
! strerror(errno));
! slon_abort();
! }
!
! /*
! * Create the local sync thread that will generate SYNC events if we had
! * local database updates.
! */
! if (pthread_create(&local_sync_thread, NULL, syncThread_main, NULL) < 0)
! {
! slon_log(SLON_FATAL, "main: cannot create syncThread - %s\n",
! strerror(errno));
! slon_abort();
! }
! #ifdef HAVE_NETSNMP
! if (pthread_create(&local_snmp_thread, NULL, snmpThread_main, NULL) < 0)
! {
! slon_log(SLON_FATAL, "main: cannot create snmpThread -%s\n",
! strerror(errno));
! slon_abort();
! }
! #endif
! /*
! * Wait until the scheduler has shut down all remote connections
! */
! slon_log(SLON_DEBUG1, "main: running scheduler mainloop\n");
! if (sched_wait_mainloop() < 0)
! {
! slon_log(SLON_FATAL, "main: scheduler returned with error\n");
! slon_abort();
! }
! slon_log(SLON_DEBUG1, "main: scheduler mainloop returned\n");
!
! /*
! * Wait for all remote threads to finish
! */
! main_thread = pthread_self();
! signal(SIGALRM, main_sigalrmhandler);
! alarm(20);
!
! slon_log(SLON_DEBUG2, "main: wait for remote threads\n");
! rtcfg_joinAllRemoteThreads();
!
! alarm(0);
!
! /*
! * Wait for the local threads to finish
! */
! if (pthread_join(local_event_thread, NULL) < 0)
! slon_log(SLON_ERROR, "main: cannot join localListenThread - %s\n",
! strerror(errno));
!
! if (pthread_join(local_cleanup_thread, NULL) < 0)
! slon_log(SLON_ERROR, "main: cannot join cleanupThread - %s\n",
! strerror(errno));
!
! if (pthread_join(local_sync_thread, NULL) < 0)
! slon_log(SLON_ERROR, "main: cannot join syncThread - %s\n",
! strerror(errno));
!
! #ifdef HAVE_NETSNMP
! if (pthread_kill(local_snmp_thread, SIGINT) < 0)
! slon_log(SLON_ERROR, "main: cannot join snmpThread - %s\n",
! strerror(errno));
! #endif
!
! /*
! * Tell parent that worker is done
! */
! slon_log(SLON_DEBUG2, "main: notify parent that worker is done\n");
!
! if (write(watchdog_pipe[1], "c", 1) != 1)
! {
! slon_log(SLON_FATAL, "main: write to watchdog pipe failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
!
! slon_log(SLON_DEBUG1, "main: done\n");
!
! exit(0);
}
! else /* parent */
{
! slon_log(SLON_DEBUG2, "slon: watchdog process started\n");
!
! /*
! * Install signal handlers
! */
!
! slon_log(SLON_DEBUG2, "slon: begin signal handler setup\n");
!
! act.sa_handler = &sighandler;
! sigemptyset(&act.sa_mask);
! act.sa_flags = SA_NOMASK;
!
! if (sigaction(SIGHUP,&act,NULL) < 0)
! {
! slon_log(SLON_FATAL, "slon: SIGHUP signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
! if (signal(SIGINT,sighandler) == SIG_ERR)
! {
! slon_log(SLON_FATAL, "slon: SIGINT signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
! if (signal(SIGTERM,sighandler) == SIG_ERR)
! {
! slon_log(SLON_FATAL, "slon: SIGTERM signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
! if (signal(SIGCHLD,sighandler) == SIG_ERR)
! {
! slon_log(SLON_FATAL, "slon: SIGCHLD signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
! if (signal(SIGQUIT,sighandler) == SIG_ERR)
! {
! slon_log(SLON_FATAL, "slon: SIGQUIT signal handler setup failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
!
! slon_log(SLON_DEBUG2, "slon: end signal handler setup\n");
!
! /*
! * Tell worker/scheduler that parent has completed initialization
! */
! if (write(watchdog_pipe[1], "p", 1) != 1)
! {
! slon_log(SLON_FATAL, "slon: write to pipe failed -(%d) %s\n", errno,strerror(errno));
! slon_exit(-1);
! }
!
! slon_log(SLON_DEBUG2, "slon: wait for main child process\n");
!
! while ((pid = wait(&child_status)) != slon_cpid)
! {
! slon_log(SLON_DEBUG2, "slon: child terminated status: %d; pid: %d, current worker pid: %d\n", child_status, pid, slon_cpid);
! }
!
! slon_log(SLON_DEBUG1, "slon: done\n");
!
! /*
! * That's it.
! */
! slon_exit(0);
}
! }
!
! static void
! main_sigalrmhandler(int signo)
! {
! if (main_thread == pthread_self())
{
! alarm(0);
! slon_log(SLON_WARN, "main: shutdown timeout exiting\n");
! kill(slon_ppid,SIGQUIT);
! exit(-1);
}
else
! {
! slon_log(SLON_WARN, "main: force SIGALRM the main thread\n");
! pthread_kill(main_thread,SIGALRM);
! }
! }
! static void
! sighandler(int signo)
! {
! switch (signo)
{
! case SIGALRM:
! case SIGCHLD:
! break;
!
! case SIGHUP:
! slon_log(SLON_DEBUG1, "slon: restart requested\n");
! slon_kill_child();
! execvp(main_argv[0], main_argv);
! slon_log(SLON_FATAL, "slon: cannot restart via execvp(): %s\n", strerror(errno));
! slon_exit(-1);
! break;
!
! case SIGINT:
! case SIGTERM:
! slon_log(SLON_DEBUG1, "slon: shutdown requested\n");
! slon_kill_child();
! slon_exit(-1);
! break;
!
! case SIGQUIT:
! slon_log(SLON_DEBUG1, "slon: shutdown now requested\n");
! kill(slon_cpid,SIGKILL);
slon_exit(-1);
+ break;
}
! }
! void
! slon_kill_child()
! {
! char pipe_c;
! struct timeval tv;
! fd_set fds;
! int rc;
! int fd;
! if (slon_cpid == 0) return;
! tv.tv_sec = 60;
! tv.tv_usec = 0;
! slon_log(SLON_DEBUG2, "slon: notify worker process to shutdown\n");
! fd = sched_wakeuppipe[1];
! FD_ZERO(&fds);
! FD_SET(fd,&fds);
! rc = select(fd + 1, NULL, &fds, NULL, &tv);
!
! if (rc == 0 || rc < 0)
{
! slon_log(SLON_DEBUG2, "slon: select write to worker timeout\n");
! kill(slon_cpid,SIGKILL);
! slon_exit(-1);
}
!
! if (write(sched_wakeuppipe[1], "p", 1) != 1)
{
! slon_log(SLON_FATAL, "main: write to worker pipe failed -(%d) %s\n", errno,strerror(errno));
! kill(slon_cpid,SIGKILL);
! slon_exit(-1);
}
! slon_log(SLON_DEBUG2, "slon: wait for worker process to shutdown\n");
! fd = watchdog_pipe[0];
! FD_ZERO(&fds);
! FD_SET(fd,&fds);
! rc = select(fd + 1, &fds, NULL, NULL, &tv);
! if (rc == 0 || rc < 0)
! {
! slon_log(SLON_DEBUG2, "slon: select read from worker pipe timeout\n");
! kill(slon_cpid,SIGKILL);
! slon_exit(-1);
! }
!
! if (read(watchdog_pipe[0], &pipe_c, 1) != 1)
! {
! slon_log(SLON_FATAL, "slon: read from worker pipe failed -(%d) %s\n", errno,strerror(errno));
! kill(slon_cpid,SIGKILL);
! slon_exit(-1);
! }
! if (pipe_c != 'c')
{
! slon_log(SLON_FATAL, "slon: incorrect data from worker pipe -(%c)\n",pipe_c);
! kill(slon_cpid,SIGKILL);
! slon_exit(-1);
}
! slon_log(SLON_DEBUG2, "slon: worker process shutdown ok\n");
}
void
slon_exit(int code)
{
! if (slon_ppid == 0 && pid_file)
{
+ slon_log(SLON_DEBUG2, "slon: remove pid file\n");
unlink(pid_file);
}
! slon_log(SLON_DEBUG2, "slon: exit(%d)\n",code);
! exit(code);
}
+
/*
* Local Variables:
* tab-width: 4
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.45
diff -c -r1.45 slon.h
*** slon.h 18 Feb 2005 00:15:57 -0000 1.45
--- slon.h 7 Mar 2005 20:01:56 -0000
***************
*** 324,329 ****
--- 324,331 ----
* ----------
*/
extern pid_t slon_pid;
+ extern pid_t slon_ppid;
+ extern pid_t slon_cpid;
extern char *rtcfg_cluster_name;
extern char *rtcfg_namespace;
extern char *rtcfg_conninfo;
***************
*** 344,359 ****
*/
#define slon_abort() \
do { \
! kill(slon_pid, SIGTERM); \
pthread_exit(NULL); \
} while (0)
#define slon_restart() \
do { \
! kill(slon_pid, SIGHUP); \
} while (0)
extern void slon_exit(int code);
extern int slon_restart_request;
extern pthread_mutex_t slon_wait_listen_lock;
extern pthread_cond_t slon_wait_listen_cond;
--- 346,363 ----
*/
#define slon_abort() \
do { \
! kill((slon_ppid == 0 ? slon_pid : slon_ppid), SIGTERM); \
pthread_exit(NULL); \
} while (0)
#define slon_restart() \
do { \
! kill((slon_ppid == 0 ? slon_pid : slon_ppid), SIGHUP); \
} while (0)
extern void slon_exit(int code);
extern int slon_restart_request;
+ extern int watchdog_pipe[];
+ extern int sched_wakeuppipe[];
extern pthread_mutex_t slon_wait_listen_lock;
extern pthread_cond_t slon_wait_listen_cond;
- Previous message: [Slony1-general] error
- Next message: [Slony1-general] signal handling watchdog using forked processes
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-general mailing list