Mon Mar 7 23:27:08 PST 2005
- Previous message: [Slony1-commit] By cbbrowne: Added scanner support for include and define Flex scanner
- Next message: [Slony1-commit] By darcyb: Add new conf option: sql_on_connection.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Major changes to log shipping, allowing it to support substantially
all events (to the degree supportible). Notably, COPY_SET now
copies the contents of tables in newly subscribed sets.
Also includes a new event, ACCEPT_SET, which addresses a race condition
where updates might be lost. If ACCEPT_SET is received before the
MOVE_SET has been processed, then the slon will wait until it has
received both.
Modified Files:
--------------
slony1-engine/src/backend:
slony1_funcs.sql (r1.55 -> r1.56)
slony1-engine/src/slon:
cleanup_thread.c (r1.19 -> r1.20)
remote_worker.c (r1.76 -> r1.77)
slon.h (r1.45 -> r1.46)
Added Files:
-----------
slony1-engine/src/ducttape:
test_8_logship (r1.1)
-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.55
retrieving revision 1.56
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.55 -r1.56
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -1877,6 +1877,21 @@
end loop;
end if;
+ -- On the new origin, raise an event - ACCEPT_SET
+ if v_local_node_id = p_new_origin then
+ -- Find the event number from the origin
+ select max(ev_seqno) as seqno into v_sub_row
+ from @NAMESPACE at .sl_event
+ where ev_type = ''MOVE_SET'' and
+ ev_data1 = p_set_id and
+ ev_data2 = p_old_origin and
+ ev_data3 = p_new_origin and
+ ev_origin = p_old_origin;
+
+ perform @NAMESPACE at .createEvent(''_ at CLUSTERNAME@'', ''ACCEPT_SET'',
+ p_set_id, p_old_origin, p_new_origin, v_sub_row.seqno);
+ end if;
+
-- ----
-- Next we have to reverse the subscription path
-- ----
@@ -4894,6 +4909,36 @@
--
-- Called by slonik during the function upgrade process.
-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .add_missing_table_field (text, text, text, text)
+returns bool as '
+DECLARE
+ p_namespace alias for $1;
+ p_table alias for $2;
+ p_field alias for $3;
+ p_type alias for $4;
+ v_row record;
+ v_query text;
+BEGIN
+ select 1 into v_row from pg_namespace n, pg_class c, pg_attribute a
+ where quote_ident(n.nspname) = p_namespace and
+ c.relnamespace = n.oid and
+ quote_ident(c.relname) = p_table and
+ a.attrelid = c.oid and
+ quote_ident(a.attname) = p_field;
+ if not found then
+ raise notice ''Upgrade table %.% - add field %'', p_namespace, p_table, p_field;
+ v_query := ''alter table '' || p_namespace || ''.'' || p_table || '' add column '';
+ v_query := v_query || p_field || '' '' || p_type || '';'';
+ execute v_query;
+ return ''t'';
+ else
+ return ''f'';
+ end if;
+END;' language plpgsql;
+
+comment on function @NAMESPACE at .add_missing_table_field (text, text, text, text)
+is 'Add a column of a given type to a table if it is missing';
+
create or replace function @NAMESPACE at .upgradeSchema(text)
returns text as '
@@ -4945,7 +4990,6 @@
execute ''alter table @NAMESPACE at .sl_node add column no_spool boolean'';
update @NAMESPACE at .sl_node set no_spool = false;
end if;
-
return p_old;
end;
' language plpgsql;
@@ -4990,6 +5034,6 @@
where con_origin = @NAMESPACE at .getLocalNodeId('_ at CLUSTERNAME@')
group by 1, 2
);
-comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are.
-';
+
+comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are.';
--- /dev/null
+++ src/ducttape/test_8_logship
@@ -0,0 +1,310 @@
+#!/bin/sh
+# $Id: test_8_logship,v 1.1 2005/03/07 23:27:02 cbbrowne Exp $
+# **********
+# test_8_logship
+#
+# This test script creates a standalone pgbench database
+# as slony_test1 and then:
+#
+# - initializes a primary node and starts the node daemon
+# - creates a set containing all 4 pgbench tables
+# - creates a second database as slony_test2
+# - adds database slony_test2 to the system
+# - starts the second replication daemon
+# - creates the pgbench tables (schema only)
+# - subscribes the replication set from the primary node
+#
+# The nature of the test has to do with the use of the new slonik
+# log shipping functionality...
+# **********
+
+export PATH
+TMPOUT=/tmp/output.$$
+LOGSHIPDIR=/tmp/logs.$$
+mkdir -p $LOGSHIPDIR
+DB1=slony_test1
+DB2=slony_test
+CLUSTERNAME=T1
+PGBENCH_SCALE=1
+PGBENCH_CLIENTS=5
+PGBENCH_TRANS=`expr 30000 / $PGBENCH_CLIENTS`
+DEBUGLEVEL=4
+
+trap '
+ echo ""
+ echo "**** user abort"
+ if [ ! -z $pgbench_pid ] ; then
+ echo "**** killing pgbench"
+ kill -15 $pgbench_pid
+ fi
+ if [ ! -z $slon1_pid ] ; then
+ echo "**** killing node daemon 1"
+ kill -15 $slon1_pid
+ fi
+ if [ ! -z $slon2_pid ] ; then
+ echo "**** killing node daemon 2"
+ kill -15 $slon2_pid
+ fi
+ exit 1
+' 2 15
+
+######################################################################
+# Preparations ... create a standalone pgbench database and
+# have the "application" (pgbench) running.
+######################################################################
+
+#####
+# Make sure the install is up to date
+#####
+WGM=`which gmake`
+if [ -z $WGM ] ; then
+ MAKE=make
+ CGNU=`make -v | grep GNU`
+ if [ -z $CGNU ] ; then
+ echo "GNU Make not found - please install GNU Make"
+ exit 1
+ fi
+else
+ MAKE=gmake
+fi
+echo -n "**** running 'make install' in src directory ... "
+if ! ${MAKE} -C .. install >$TMPOUT 2>&1 ; then
+ echo "failed"; cat $TMPOUT; rm $TMPOUT; exit 1
+fi
+echo "done"
+rm $TMPOUT
+
+PREAMBLE_FILE=/tmp/preamble.$$
+cat <<EOF > $PREAMBLE_FILE
+define origin 11;
+define sub1 22;
+cluster name = $CLUSTERNAME;
+node @origin admin conninfo='dbname=$DB1';
+node @sub1 admin conninfo='dbname=$DB2';
+EOF
+
+
+#####
+# Remove old databases, if they exist
+#####
+echo "**** remove old test databases"
+dropdb $DB1 || echo "**** ignored"
+sleep 1
+dropdb $DB2 || echo "**** ignored"
+sleep 1
+
+#####
+# Create the "Primary Node"
+#####
+echo "**** creating database for Node 11"
+
+createdb $DB1 || exit 1
+pgbench -i -s $PGBENCH_SCALE $DB1
+pg_dump -s $DB1 >pgbench_schema.sql
+
+#####
+# Start pgbench in the background and give it rampup time
+#####
+pgbench -n -s $PGBENCH_SCALE -c $PGBENCH_CLIENTS -t $PGBENCH_TRANS $DB1 &
+pgbench_pid=$!
+echo "**** pgbench is running in background with pid $pgbench_pid"
+echo -n "**** sleeping 10 seconds to give pgbench time for rampup ... "
+sleep 10
+echo "done"
+
+echo ""
+echo "**********************************************************************"
+echo "**** $DB1 is now a standalone database with a running pgbench"
+echo "**********************************************************************"
+echo ""
+
+######################################################################
+# Setup DB1 as the primary cluster T1 node, start the node daemon,
+# and create a replication set containing the pgbench tables.
+######################################################################
+
+echo "**** initializing $DB1 as Primary Node for Slony-I cluster $CLUSTERNAME"
+slonik <<_EOF_
+ include <$PREAMBLE_FILE>;
+ init cluster (id = @origin, comment = 'Node @origin');
+ echo 'Database $DB1 initialized as Node 11';
+_EOF_
+if [ $? -ne 0 ] ; then
+ kill $pgbench_pid;
+ exit 1
+fi
+
+echo "**** starting the Slony-I node daemon for $DB1"
+xterm -title "Slon node 11" -e sh -c "slon -d$DEBUGLEVEL -s500 -g10 $CLUSTERNAME dbname=$DB1; echo -n 'Enter>'; read line" &
+slon1_pid=$!
+echo "slon[$slon1_pid] on dbname=$DB1"
+
+echo "**** creating a replication set containing the 4 pgbench tables ... "
+slonik <<_EOF_
+ include <$PREAMBLE_FILE>;
+ try {
+ table add key (node id = @origin, fully qualified name = 'public.history');
+ }
+ on error {
+ exit 1;
+ }
+
+ try {
+ create set (id = 1, origin = @origin, comment = 'Set 1 - pgbench tables');
+ set add table (set id = 1, origin = @origin,
+ id = 1, fully qualified name = 'public.accounts',
+ comment = 'Table accounts');
+ set add table (set id = 1, origin = @origin,
+ id = 2, fully qualified name = 'public.branches',
+ comment = 'Table branches');
+ set add table (set id = 1, origin = @origin,
+ id = 3, fully qualified name = 'public.tellers',
+ comment = 'Table tellers');
+ set add table (set id = 1, origin = @origin,
+ id = 4, fully qualified name = 'public.history',
+ key = serial, comment = 'Table accounts');
+ }
+ on error {
+ exit 1;
+ }
+_EOF_
+
+if [ $? -ne 0 ] ; then
+ echo "failed"
+ kill $pgbench_pid 2>/dev/null
+ kill $slon1_pid 2>/dev/null
+ cat $TMPOUT
+ rm $TMPOUT
+ exit 1
+fi
+echo "**** set created"
+
+#####
+# Check that pgbench is still running
+#####
+if ! kill -0 $pgbench_pid 2>/dev/null ; then
+ echo "**** pgbench terminated ???"
+ kill $slon1_pid 2>/dev/null
+ exit 1
+fi
+
+echo ""
+echo "**********************************************************************"
+echo "**** $DB1 is now the Slony-I origin for set 1"
+echo "**********************************************************************"
+echo ""
+
+######################################################################
+# Setup DB2 as a subscriber node and let it subscribe the replication
+# set of the running pgbench
+######################################################################
+echo "**** creating database for node 22"
+if ! createdb $DB2 ; then
+ kill $pgbench_pid 2>/dev/null
+ kill $slon1_pid 2>/dev/null
+ exit 1
+fi
+
+echo "**** initializing $DB2 as node 22 of Slony-I cluster $CLUSTERNAME"
+slonik <<_EOF_
+ include <$PREAMBLE_FILE>;
+ echo 'Creating node 22';
+ try {
+ store node (id = @sub1, comment = 'node @sub1', event node = @origin);
+ } on error {
+ echo 'could not establish node @sub1';
+ exit -1;
+ }
+ try {
+ store path (server = @origin, client = @sub1, conninfo = 'dbname=$DB1');
+ store path (server = @sub1, client = @origin, conninfo = 'dbname=$DB2');
+ }
+ on error {
+ echo 'could not establish paths between @origin and @sub1';
+ exit -1;
+ }
+ echo 'Database $DB2 added as node @sub1';
+_EOF_
+if [ $? -ne 0 ] ; then
+ kill $pgbench_pid 2>/dev/null
+ kill $slon1_pid 2>/dev/null
+ exit 1
+fi
+
+echo "**** starting the Slony-I node daemon for $DB1"
+xterm -title "Slon node 22" -e sh -c "slon -d$DEBUGLEVEL -s10000 -o10000 -g10 -a $LOGSHIPDIR $CLUSTERNAME dbname=$DB2; echo -n 'Enter>'; read line" &
+slon2_pid=$!
+echo "slon[$slon2_pid] on dbname=$DB2"
+
+#####
+# Check that pgbench is still running
+#####
+if ! kill -0 $pgbench_pid 2>/dev/null ; then
+ echo "**** pgbench terminated ???"
+ kill $slon1_pid 2>/dev/null
+ exit 1
+fi
+
+######################################################################
+# And now comes the moment where the big elephant starts to pee
+# and the attendants in the first row climb on their chairs ...
+######################################################################
+echo "**** creating pgbench tables and subscribing node 22 to set 1"
+(
+ cat pgbench_schema.sql
+) | psql -q $DB2
+slonik <<_EOF_
+ include <$PREAMBLE_FILE>;
+ subscribe set ( id = 1, provider = @origin, receiver = @sub1, forward = yes );
+_EOF_
+
+echo ""
+echo "**********************************************************************"
+echo "**** $DB2 should now be copying data and attempting to catch up."
+echo "**********************************************************************"
+echo ""
+
+echo -n "**** waiting for pgbench to finish "
+while kill -0 $pgbench_pid 2>/dev/null ; do
+ echo -n "."
+ sleep 10
+done
+echo "**** pgbench finished"
+echo "**** please terminate the replication engines when caught up."
+wait $slon1_pid
+wait $slon2_pid
+
+kill $pgbench_pid 2>/dev/null
+kill $slon1_pid 2>/dev/null
+kill $slon2_pid 2>/dev/null
+
+echo -n "**** comparing databases ... "
+psql $DB1 >dump.tmp.1.$$ <<_EOF_
+ select 'accounts:'::text, aid, bid, abalance, filler
+ from accounts order by aid;
+ select 'branches:'::text, bid, bbalance, filler
+ from branches order by bid;
+ select 'tellers:'::text, tid, bid, tbalance, filler
+ from tellers order by tid;
+ select 'history:'::text, tid, bid, aid, delta, mtime, filler,
+ "_Slony-I_${CLUSTERNAME}_rowID" from history order by "_Slony-I_${CLUSTERNAME}_rowID";
+_EOF_
+psql $DB2 >dump.tmp.2.$$ <<_EOF_
+ select 'accounts:'::text, aid, bid, abalance, filler
+ from accounts order by aid;
+ select 'branches:'::text, bid, bbalance, filler
+ from branches order by bid;
+ select 'tellers:'::text, tid, bid, tbalance, filler
+ from tellers order by tid;
+ select 'history:'::text, tid, bid, aid, delta, mtime, filler,
+ "_Slony-I_${CLUSTERNAME}_rowID" from history order by "_Slony-I_${CLUSTERNAME}_rowID";
+_EOF_
+
+if diff dump.tmp.1.$$ dump.tmp.2.$$ >test_1.diff ; then
+ echo "success - databases are equal."
+ rm dump.tmp.?.$$
+ rm test_1.diff
+else
+ echo "FAILED - see test_1.diff for database differences"
+fi
+rm $PREAMBLE_FILE
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.76
retrieving revision 1.77
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.76 -r1.77
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -249,7 +249,9 @@
static int generate_archive_header (int node_id, char *seqbuf);
static int submit_query_to_archive(SlonDString *ds);
static int submit_string_to_archive (const char *s);
+static int submit_raw_data_to_archive (const char *s);
static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf);
+static int write_void_log (int node_id, char *seqbuf, const char *message);
#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
@@ -587,6 +589,9 @@
no_id, no_comment, no_spool);
need_reloadListen = true;
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_NODE");
+
}
else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
{
@@ -601,6 +606,9 @@
no_id);
need_reloadListen = true;
+
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- ENABLE_NODE");
}
else if (strcmp(event->ev_type, "DROP_NODE") == 0)
{
@@ -650,6 +658,8 @@
rtcfg_cluster_name);
need_reloadListen = true;
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_NODE");
}
else if (strcmp(event->ev_type, "STORE_PATH") == 0)
{
@@ -667,6 +677,8 @@
pa_server, pa_client, pa_conninfo, pa_connretry);
need_reloadListen = true;
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_PATH");
}
else if (strcmp(event->ev_type, "DROP_PATH") == 0)
{
@@ -682,6 +694,8 @@
pa_server, pa_client);
need_reloadListen = true;
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_PATH");
}
else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
{
@@ -696,6 +710,8 @@
"select %s.storeListen_int(%d, %d, %d); ",
rtcfg_namespace,
li_origin, li_provider, li_receiver);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_LISTEN");
}
else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
{
@@ -710,6 +726,8 @@
"select %s.dropListen_int(%d, %d, %d); ",
rtcfg_namespace,
li_origin, li_provider, li_receiver);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_LISTEN");
}
else if (strcmp(event->ev_type, "STORE_SET") == 0)
{
@@ -724,6 +742,9 @@
"select %s.storeSet_int(%d, %d, '%q'); ",
rtcfg_namespace,
set_id, set_origin, set_comment);
+
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_SET");
}
else if (strcmp(event->ev_type, "DROP_SET") == 0)
{
@@ -734,18 +755,46 @@
slon_appendquery(&query1,
"select %s.dropSet_int(%d); ",
rtcfg_namespace, set_id);
+
+ /* The table deleted needs to be
+ * dropped from log shipping too */
+ if (archive_dir) {
+ rc = open_log_archive(rtcfg_nodeid, seqbuf);
+ rc = generate_archive_header(rtcfg_nodeid, seqbuf);
+ slon_mkquery(&query1,
+ "delete from %s.sl_setsync_offline "
+ " where ssy_setid= %d;",
+ rtcfg_namespace, set_id);
+ rc = submit_query_to_archive(&query1);
+ rc = close_log_archive();
+ }
}
else if (strcmp(event->ev_type, "MERGE_SET") == 0)
{
int set_id = (int)strtol(event->ev_data1, NULL, 10);
int add_id = (int)strtol(event->ev_data2, NULL, 10);
-
+ char dropquery[280];
rtcfg_dropSet(add_id);
slon_appendquery(&query1,
"select %s.mergeSet_int(%d, %d); ",
rtcfg_namespace,
set_id, add_id);
+
+ /* Log shipping gets the change here
+ * that we need to delete the table
+ * being merged from the set being
+ * maintained. */
+ if (archive_dir) {
+ rc = open_log_archive(rtcfg_nodeid, seqbuf);
+ rc = generate_archive_header(rtcfg_nodeid, seqbuf);
+ rc = slon_mkquery(&query1,
+ "delete from %s.sl_setsync_offline "
+ " where ssy_setid= %d;",
+ rtcfg_namespace, add_id);
+ rc = submit_query_to_archive(&query1);
+ rc = close_log_archive();
+ }
}
else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
{
@@ -754,6 +803,8 @@
* subscribed sets yet and table information is not maintained
* in the runtime configuration.
*/
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE");
}
else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
{
@@ -762,6 +813,8 @@
* subscribed sets yet and sequences information is not
* maintained in the runtime configuration.
*/
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE");
}
else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0)
{
@@ -770,6 +823,8 @@
slon_appendquery(&query1, "select %s.setDropTable_int(%d);",
rtcfg_namespace,
tab_id);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE");
}
else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0)
{
@@ -778,6 +833,8 @@
slon_appendquery(&query1, "select %s.setDropSequence_int(%d);",
rtcfg_namespace,
seq_id);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE");
}
else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0)
{
@@ -787,6 +844,8 @@
slon_appendquery(&query1, "select %s.setMoveTable_int(%d, %d);",
rtcfg_namespace,
tab_id, new_set_id);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE");
}
else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0)
{
@@ -796,6 +855,8 @@
slon_appendquery(&query1, "select %s.setMoveSequence_int(%d, %d);",
rtcfg_namespace,
seq_id, new_set_id);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE");
}
else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
{
@@ -806,6 +867,8 @@
"select %s.storeTrigger_int(%d, '%q'); ",
rtcfg_namespace,
trig_tabid, trig_tgname);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER");
}
else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
{
@@ -816,6 +879,70 @@
"select %s.dropTrigger_int(%d, '%q'); ",
rtcfg_namespace,
trig_tabid, trig_tgname);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER");
+ }
+ else if (strcmp(event->ev_type, "ACCEPT_SET") == 0)
+ {
+ int set_id = (int) strtol(event->ev_data1, NULL, 10);
+ int old_origin = (int) strtol(event->ev_data2, NULL, 10);
+ int new_origin = (int) strtol(event->ev_data3, NULL, 10);
+ int seq_no = (int) strtol(event->ev_data4, NULL, 10);
+ PGresult *res;
+
+ /* If we're a remote node, and haven't yet
+ * received the MOVE_SET event from the
+ * new origin, then we'll need to sleep a
+ * bit... This avoids a race condition
+ * where new SYNCs take place on the new
+ * origin, and are ignored on some
+ * subscribers (and their children)
+ * because the MOVE_SET wasn't yet
+ * received and processed */
+
+ if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin)) {
+ slon_mkquery(&query1,
+ "select 1 from %s.sl_event accept "
+ "where "
+ " accept.ev_type = 'ACCEPT_SET' and "
+ " accept.ev_origin = %d and "
+ " accept.ev_data1 = %d and "
+ " accept.ev_data2 = %d and "
+ " accept.ev_data3 = %d and "
+ " accept.ev_data4 = %d and "
+ " not exists "
+ " (select 1 from %s.sl_event move "
+ " where "
+ " accept.ev_origin = move.ev_data3 and "
+ " move.ev_type = 'MOVE_SET' and "
+ " move.ev_data1 = accept.ev_data1 and "
+ " move.ev_data2 = accept.ev_data2 and "
+ " move.ev_data3 = accept.ev_data3 and "
+ " move.ev_seqno = %d); ",
+
+ rtcfg_namespace,
+ old_origin, set_id, old_origin, new_origin, seq_no,
+ rtcfg_namespace, seq_no);
+ res = PQexec(local_dbconn, dstring_data(&query1));
+ while (PQntuples(res) > 0) {
+ int sleeptime = 15;
+ int sched_rc;
+ slon_log(SLON_WARN, "remoteWorkerThread_%d: "
+ "accept set: node has not yet received MOVE_SET event "
+ "for set %d old origin %d new origin - sleep %d seconds\n",
+ rtcfg_nodeid, set_id, old_origin, new_origin, sleeptime);
+ sched_rc = sched_msleep(node, sleeptime * 1000);
+ if (sched_rc != SCHED_STATUS_OK) {
+ event_ok = false;
+ break;
+ } else {
+ if (sleeptime < 60)
+ sleeptime *= 2;
+ }
+ res = PQexec(local_dbconn, dstring_data(&query1));
+ }
+ }
+
}
else if (strcmp(event->ev_type, "MOVE_SET") == 0)
{
@@ -831,6 +958,7 @@
* that, we need to execute it now and select the resulting
* provider for us.
*/
+
slon_appendquery(&query1,
"select %s.moveSet_int(%d, %d, %d); ",
rtcfg_namespace,
@@ -880,6 +1008,8 @@
rtcfg_namespace,
failed_node, backup_node, set_id);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- FAILOVER_SET");
need_reloadListen = true;
}
else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
@@ -896,7 +1026,8 @@
"select %s.subscribeSet_int(%d, %d, %d, '%q'); ",
rtcfg_namespace,
sub_set, sub_provider, sub_receiver, sub_forward);
-
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET");
need_reloadListen = true;
}
else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
@@ -1004,7 +1135,9 @@
rtcfg_namespace,
sub_set, sub_provider, sub_receiver);
}
-
+ /* Note: No need to do anything based
+ on archive_dir here; copy_set does
+ that nicely already. */
need_reloadListen = true;
}
else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0)
@@ -1022,6 +1155,16 @@
sub_set, sub_receiver);
need_reloadListen = true;
+ if (archive_dir) {
+ rc = open_log_archive(rtcfg_nodeid, seqbuf);
+ rc = generate_archive_header(rtcfg_nodeid, seqbuf);
+ slon_mkquery(&query1,
+ "delete from %s.sl_setsync_offline "
+ " where ssy_setid= %d;",
+ rtcfg_namespace, sub_set);
+ rc = submit_query_to_archive(&query1);
+ rc = close_log_archive();
+ }
}
else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
{
@@ -1087,11 +1230,15 @@
"select %s.updateReloid(%d, '%q', %d); ",
rtcfg_namespace,
reset_config_setid, reset_configonly_on_node);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- RESET_CONFIG");
}
else
{
printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n",
node->no_id, event->ev_origin, event->ev_seqno, event->ev_type);
+ if (archive_dir)
+ write_void_log (rtcfg_nodeid, seqbuf, "-- UNHANDLED EVENT!!!");
}
/*
@@ -2107,6 +2254,33 @@
dstring_init(&query1);
sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
+
+ /* Log Shipping Support begins... */
+ /* - Open the log, put the header in
+ Isn't it convenient that seqbuf was just populated??? :-)
+ */
+ if (archive_dir) {
+ rc = open_log_archive(rtcfg_nodeid, seqbuf);
+ if (rc < 0) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not open COPY SET archive file %s - %s",
+ node->no_id, archive_tmp, strerror(errno));
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ rc = generate_archive_header(rtcfg_nodeid, seqbuf);
+ if (rc < 0) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Could not generate COPY SET archive header %s - %s",
+ node->no_id, archive_tmp, strerror(errno));
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ }
/*
* Listen on the special relation telling what node daemon this connection
* belongs to.
@@ -2118,6 +2292,7 @@
{
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
@@ -2147,6 +2322,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
if (*(PQgetvalue(res1, 0, 0)) == 't')
@@ -2157,6 +2333,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
PQclear(res1);
@@ -2170,6 +2347,7 @@
{
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
}
@@ -2199,6 +2377,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
ntuples1 = PQntuples(res1);
@@ -2236,6 +2415,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2259,6 +2439,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2279,6 +2460,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
@@ -2314,6 +2496,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
@@ -2334,6 +2517,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
ntuples2 = PQntuples(res2);
@@ -2348,6 +2532,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
}
@@ -2356,6 +2541,10 @@
/*
* Begin a COPY from stdin for the table on the local DB
*/
+ slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: "
+ "Begin COPY of table %s\n",
+ node->no_id, tab_fqname);
+
slon_mkquery(&query1,
"select %s.truncateTable('%s'); "
"copy %s from stdin; ",
@@ -2372,9 +2561,25 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
+ if (archive_dir) {
+ slon_log(SLON_DEBUG4, "start log ship copy of %s\n", tab_fqname);
+ slon_mkquery(&query1,
+ "delete from %s;copy %s from stdin;", tab_fqname, tab_fqname);
+ rc = submit_query_to_archive(&query1);
+ if (rc < 0) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_d: "
+ "Could not generate copy_set request for %s - %s",
+ node->no_id, tab_fqname, strerror(errno));
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ }
/*
* Begin a COPY to stdout for the table on the provider DB
*/
@@ -2398,6 +2603,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
@@ -2410,7 +2616,6 @@
int len = strlen(copydata);
copysize += (int64) len;
-
if (PQputCopyData(loc_dbconn, copydata, len) != 1)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
@@ -2426,8 +2631,30 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
+ if (archive_dir) {
+ rc = fwrite(copydata, 1, len, archive_fp);
+ if (rc != len) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "PQputCopyData() - log shipping - %s",
+ node->no_id, strerror(errno));
+#ifdef SLON_MEMDEBUG
+ memset(copydata, 88, len);
+#endif
+ PQfreemem(copydata);
+ PQputCopyEnd(loc_dbconn, "Slony-I: copy set operation");
+ PQclear(res3);
+ PQclear(res2);
+ PQclear(res1);
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+
+ }
+ }
#ifdef SLON_MEMDEBUG
memset(copydata, 88, len);
#endif
@@ -2444,6 +2671,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
@@ -2464,6 +2692,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
PQclear(res3);
@@ -2480,6 +2709,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
res2 = PQgetResult(loc_dbconn);
@@ -2493,8 +2723,12 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
+ if (archive_dir) {
+ rc = submit_string_to_archive("\\.");
+ }
#else /* ! HAVE_PQPUTCOPYDATA */
copydone = false;
while (!copydone)
@@ -2517,16 +2751,22 @@
case 0:
PQputline(loc_dbconn, copybuf);
PQputline(loc_dbconn, "\n");
+ if (archive_dir)
+ submit_string_to_archive(copybuf);
break;
case 1:
PQputline(loc_dbconn, copybuf);
+ if (archive_dir)
+ submit_raw_data_to_archive(copybuf);
break;
}
}
}
PQputline(loc_dbconn, "\\.\n");
-
+ if (archive_dir) {
+ rc = submit_string_to_archive("\\\.");
+ }
/*
* End the COPY to stdout on the provider
*/
@@ -2541,6 +2781,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
PQclear(res3);
@@ -2562,6 +2803,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
#endif /* HAVE_PQPUTCOPYDATA */
@@ -2580,8 +2822,13 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
+ if (archive_dir) {
+ submit_query_to_archive(&query1);
+ }
+
gettimeofday(&tv_now, NULL);
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
"%.3f seconds to copy table %s\n",
@@ -2616,6 +2863,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
ntuples1 = PQntuples(res1);
@@ -2638,6 +2886,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
}
@@ -2670,6 +2919,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
ntuples1 = PQntuples(res1);
@@ -2683,6 +2933,8 @@
"set last_value of sequence %s (%s) to %s\n",
node->no_id, seql_seqid, seq_fqname, seql_last_value);
+
+
/*
* sequence with ID 0 is a nodes rowid ... only remember in seqlog.
*/
@@ -2691,6 +2943,10 @@
slon_mkquery(&query1,
"select \"pg_catalog\".setval('%q', '%s'); ",
seq_fqname, seql_last_value);
+
+ if (archive_dir) {
+ submit_query_to_archive(&query1);
+ }
}
else
dstring_reset(&query1);
@@ -2706,6 +2962,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
}
@@ -2749,6 +3006,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
if (PQntuples(res1) != 1)
@@ -2759,6 +3017,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
if (PQgetisnull(res1, 0, 0))
@@ -2805,6 +3064,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
if (PQntuples(res1) != 1)
@@ -2815,6 +3075,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
ssy_seqno = PQgetvalue(res1, 0, 0);
@@ -2859,6 +3120,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
ntuples1 = PQntuples(res2);
@@ -2899,6 +3161,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
if (PQntuples(res1) != 1)
@@ -2909,6 +3172,7 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
dstring_init(&ssy_action_list);
@@ -2937,14 +3201,45 @@
{
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ if (archive_dir) {
+ slon_mkquery(&query1,
+ "insert into %s.sl_setsync_offline () "
+ "values ('%d', '%d');",
+ rtcfg_namespace, set_id, ssy_seqno);
+ rc = submit_query_to_archive(&query1);
+ if (rc < 0) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ " could not insert to sl_setsync_offline",
+ node->no_id);
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
+ }
gettimeofday(&tv_now, NULL);
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
"%.3f seconds to build initial setsync status\n",
node->no_id,
TIMEVAL_DIFF(&tv_start2, &tv_now));
+ if (archive_dir) {
+ rc = close_log_archive();
+ if (rc < 0) {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ " could not close archive log %s - %s",
+ node->no_id, archive_tmp, strerror(errno));
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ }
+
+
/*
* Roll back the transaction we used on the provider and close the
* database connection.
@@ -2954,6 +3249,7 @@
{
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ terminate_log_archive();
return -1;
}
slon_disconnectdb(pro_conn);
@@ -2966,11 +3262,9 @@
gettimeofday(&tv_now, NULL);
slon_log(SLON_DEBUG1, "copy_set %d done in %.3f seconds\n", set_id,
TIMEVAL_DIFF(&tv_start, &tv_now));
-
return 0;
}
-
static int
sync_event(SlonNode * node, SlonConn * local_conn,
WorkerGroupData * wd, SlonWorkMsg_event * event)
@@ -4219,9 +4513,11 @@
int close_log_archive () {
int rc;
+ if (archive_dir) {
rc = fprintf(archive_fp, "\n------------------------------------------------------------------\n-- End Of Archive Log\n------------------------------------------------------------------\ncommit;\n");
rc = fclose(archive_fp);
rc = rename(archive_tmp, archive_name);
+ }
return rc;
}
@@ -4231,13 +4527,18 @@
}
int submit_query_to_archive(SlonDString *ds) {
- return fprintf(archive_fp, "%s\n", *ds->data);
+ return fprintf(archive_fp, "%s\n", ds->data);
}
int submit_string_to_archive (const char *s) {
return fprintf(archive_fp, "%s\n", s);
}
+/* Raw form used for COPY where we don't want any extra cr/lf output */
+int submit_raw_data_to_archive (const char *s) {
+ return fprintf(archive_fp, "%s", s);
+}
+
void terminate_log_archive () {
if (archive_fp) {
fclose(archive_fp);
@@ -4248,17 +4549,20 @@
time_t now;
now = time(NULL);
return fprintf(archive_fp,
- "-- Slony-I sync log\n"
+ "-- Slony-I log shipping archive\n"
"-- Node %d, Event %s\n"
"-- at... %s\n"
"start transaction;\n",
node_id, seqbuf, ctime(&now));
}
-/*
- * Local Variables:
- * tab-width: 4
- * c-indent-level: 4
- * c-basic-offset: 4
- * End:
- */
+/* write_void_log() writes out a "void" log consisting of the message
+ * which must either be a valid SQL query or a SQL comment. */
+
+int write_void_log (int node_id, char *seqbuf, const char *message) {
+ open_log_archive(node_id, seqbuf);
+ generate_archive_header(node_id, seqbuf);
+ submit_string_to_archive(message);
+ close_log_archive();
+}
+
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.45
retrieving revision 1.46
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.45 -r1.46
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -50,7 +50,7 @@
#define SLON_CLEANUP_SLEEP 600 /* sleep 10 minutes between */
/* cleanup calls */
-#define SLON_VACUUM_FREQUENCY 1 /* vacuum every 3rd cleanup */
+#define SLON_VACUUM_FREQUENCY 3 /* vacuum every 3rd cleanup */
typedef enum
Index: cleanup_thread.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.19
retrieving revision 1.20
diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.19 -r1.20
--- src/slon/cleanup_thread.c
+++ src/slon/cleanup_thread.c
@@ -32,7 +32,8 @@
* ---------- Global data ----------
*/
int vac_frequency = SLON_VACUUM_FREQUENCY;
-
+static unsigned long earliest_xid = 0;
+static unsigned long get_earliest_xid (PGconn *dbconn);
/*
* ---------- cleanupThread_main
*
@@ -55,6 +56,7 @@
int n ,
t;
int vac_count = 0;
+ char *vacuum_action;
slon_log(SLON_DEBUG1, "cleanupThread: thread starts\n");
@@ -166,28 +168,44 @@
*/
if (vac_frequency != 0 && ++vac_count >= vac_frequency)
{
+ unsigned long latest_xid;
vac_count = 0;
-
+ latest_xid = get_earliest_xid(dbconn);
+ if (earliest_xid != latest_xid) {
+ vacuum_action = "vacuum analyze";
+ } else {
+ vacuum_action = "analyze";
+ slon_log(SLON_DEBUG4, "cleanupThread: xid %d still active - analyze instead\n",
+ earliest_xid);
+ }
+ earliest_xid = latest_xid;
/*
* Build the query string for vacuuming replication runtime data
* and event tables
*/
dstring_init(&query3);
slon_mkquery(&query3,
- "vacuum analyze %s.sl_event; "
- "vacuum analyze %s.sl_confirm; "
- "vacuum analyze %s.sl_setsync; "
- "vacuum analyze %s.sl_log_1; "
- "vacuum analyze %s.sl_log_2;"
- "vacuum analyze %s.sl_seqlog;"
- "vacuum analyze pg_catalog.pg_listener;",
+ "%s %s.sl_event; "
+ "%s %s.sl_confirm; "
+ "%s %s.sl_setsync; "
+ "%s %s.sl_log_1; "
+ "%s %s.sl_log_2;"
+ "%s %s.sl_seqlog;"
+ "%s pg_catalog.pg_listener;",
+ vacuum_action,
rtcfg_namespace,
+ vacuum_action,
rtcfg_namespace,
+ vacuum_action,
rtcfg_namespace,
+ vacuum_action,
rtcfg_namespace,
+ vacuum_action,
rtcfg_namespace,
- rtcfg_namespace);
-
+ vacuum_action,
+ rtcfg_namespace,
+ vacuum_action
+ );
gettimeofday(&tv_start, NULL);
res = PQexec(dbconn, dstring_data(&query3));
@@ -231,3 +249,41 @@
slon_log(SLON_DEBUG1, "cleanupThread: thread done\n");
pthread_exit(NULL);
}
+
+
+static unsigned long get_earliest_xid (PGconn *dbconn) {
+ unsigned long lo = 2147483647;
+ unsigned long minhi = -1;
+ unsigned long minlo = lo;
+ unsigned long xid;
+ long n,t;
+ PGresult *res;
+ SlonDString query1;
+ dstring_init(&query1);
+ slon_mkquery(&query1, "select transaction from pg_catalog.pg_locks where transaction is not null;");
+ res = PQexec(dbconn, dstring_data(&query1));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK) {
+ slon_log(SLON_FATAL, "cleanupThread: could not read locks from pg_locks!");
+ PQclear(res);
+ slon_abort();
+ return -1;
+ } else {
+ n = PQntuples(res);
+ for (t = 0; t < n; t++) {
+ xid = atoi(PQgetvalue(res, t, 0));
+ printf ("xid: %d\n", xid);
+ if (xid > lo) {
+ if (xid < minlo)
+ minlo = xid;
+ } else {
+ if (xid < minhi)
+ minhi = xid;
+ }
+ }
+ }
+ printf("minhi: %d minlo: %d\n", minlo, minhi);
+ if ((minhi - lo) < minlo)
+ return minlo;
+ else
+ return minhi;
+}
- Previous message: [Slony1-commit] By cbbrowne: Added scanner support for include and define Flex scanner
- Next message: [Slony1-commit] By darcyb: Add new conf option: sql_on_connection.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list