Mon Mar 7 23:27:08 PST 2005
- Previous message: [Slony1-commit] By cbbrowne: Added scanner support for include and define Flex scanner
- Next message: [Slony1-commit] By darcyb: Add new conf option: sql_on_connection.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- Major changes to log shipping, allowing it to support substantially all events (to the degree supportible). Notably, COPY_SET now copies the contents of tables in newly subscribed sets. Also includes a new event, ACCEPT_SET, which addresses a race condition where updates might be lost. If ACCEPT_SET is received before the MOVE_SET has been processed, then the slon will wait until it has received both. Modified Files: -------------- slony1-engine/src/backend: slony1_funcs.sql (r1.55 -> r1.56) slony1-engine/src/slon: cleanup_thread.c (r1.19 -> r1.20) remote_worker.c (r1.76 -> r1.77) slon.h (r1.45 -> r1.46) Added Files: ----------- slony1-engine/src/ducttape: test_8_logship (r1.1) -------------- next part -------------- Index: slony1_funcs.sql =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v retrieving revision 1.55 retrieving revision 1.56 diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.55 -r1.56 --- src/backend/slony1_funcs.sql +++ src/backend/slony1_funcs.sql @@ -1877,6 +1877,21 @@ end loop; end if; + -- On the new origin, raise an event - ACCEPT_SET + if v_local_node_id = p_new_origin then + -- Find the event number from the origin + select max(ev_seqno) as seqno into v_sub_row + from @NAMESPACE at .sl_event + where ev_type = ''MOVE_SET'' and + ev_data1 = p_set_id and + ev_data2 = p_old_origin and + ev_data3 = p_new_origin and + ev_origin = p_old_origin; + + perform @NAMESPACE at .createEvent(''_ at CLUSTERNAME@'', ''ACCEPT_SET'', + p_set_id, p_old_origin, p_new_origin, v_sub_row.seqno); + end if; + -- ---- -- Next we have to reverse the subscription path -- ---- @@ -4894,6 +4909,36 @@ -- -- Called by slonik during the function upgrade process. -- ---------------------------------------------------------------------- +create or replace function @NAMESPACE at .add_missing_table_field (text, text, text, text) +returns bool as ' +DECLARE + p_namespace alias for $1; + p_table alias for $2; + p_field alias for $3; + p_type alias for $4; + v_row record; + v_query text; +BEGIN + select 1 into v_row from pg_namespace n, pg_class c, pg_attribute a + where quote_ident(n.nspname) = p_namespace and + c.relnamespace = n.oid and + quote_ident(c.relname) = p_table and + a.attrelid = c.oid and + quote_ident(a.attname) = p_field; + if not found then + raise notice ''Upgrade table %.% - add field %'', p_namespace, p_table, p_field; + v_query := ''alter table '' || p_namespace || ''.'' || p_table || '' add column ''; + v_query := v_query || p_field || '' '' || p_type || '';''; + execute v_query; + return ''t''; + else + return ''f''; + end if; +END;' language plpgsql; + +comment on function @NAMESPACE at .add_missing_table_field (text, text, text, text) +is 'Add a column of a given type to a table if it is missing'; + create or replace function @NAMESPACE at .upgradeSchema(text) returns text as ' @@ -4945,7 +4990,6 @@ execute ''alter table @NAMESPACE at .sl_node add column no_spool boolean''; update @NAMESPACE at .sl_node set no_spool = false; end if; - return p_old; end; ' language plpgsql; @@ -4990,6 +5034,6 @@ where con_origin = @NAMESPACE at .getLocalNodeId('_ at CLUSTERNAME@') group by 1, 2 ); -comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are. -'; + +comment on view @NAMESPACE at .sl_status is 'View showing how far behind remote nodes are.'; --- /dev/null +++ src/ducttape/test_8_logship @@ -0,0 +1,310 @@ +#!/bin/sh +# $Id: test_8_logship,v 1.1 2005/03/07 23:27:02 cbbrowne Exp $ +# ********** +# test_8_logship +# +# This test script creates a standalone pgbench database +# as slony_test1 and then: +# +# - initializes a primary node and starts the node daemon +# - creates a set containing all 4 pgbench tables +# - creates a second database as slony_test2 +# - adds database slony_test2 to the system +# - starts the second replication daemon +# - creates the pgbench tables (schema only) +# - subscribes the replication set from the primary node +# +# The nature of the test has to do with the use of the new slonik +# log shipping functionality... +# ********** + +export PATH +TMPOUT=/tmp/output.$$ +LOGSHIPDIR=/tmp/logs.$$ +mkdir -p $LOGSHIPDIR +DB1=slony_test1 +DB2=slony_test +CLUSTERNAME=T1 +PGBENCH_SCALE=1 +PGBENCH_CLIENTS=5 +PGBENCH_TRANS=`expr 30000 / $PGBENCH_CLIENTS` +DEBUGLEVEL=4 + +trap ' + echo "" + echo "**** user abort" + if [ ! -z $pgbench_pid ] ; then + echo "**** killing pgbench" + kill -15 $pgbench_pid + fi + if [ ! -z $slon1_pid ] ; then + echo "**** killing node daemon 1" + kill -15 $slon1_pid + fi + if [ ! -z $slon2_pid ] ; then + echo "**** killing node daemon 2" + kill -15 $slon2_pid + fi + exit 1 +' 2 15 + +###################################################################### +# Preparations ... create a standalone pgbench database and +# have the "application" (pgbench) running. +###################################################################### + +##### +# Make sure the install is up to date +##### +WGM=`which gmake` +if [ -z $WGM ] ; then + MAKE=make + CGNU=`make -v | grep GNU` + if [ -z $CGNU ] ; then + echo "GNU Make not found - please install GNU Make" + exit 1 + fi +else + MAKE=gmake +fi +echo -n "**** running 'make install' in src directory ... " +if ! ${MAKE} -C .. install >$TMPOUT 2>&1 ; then + echo "failed"; cat $TMPOUT; rm $TMPOUT; exit 1 +fi +echo "done" +rm $TMPOUT + +PREAMBLE_FILE=/tmp/preamble.$$ +cat <<EOF > $PREAMBLE_FILE +define origin 11; +define sub1 22; +cluster name = $CLUSTERNAME; +node @origin admin conninfo='dbname=$DB1'; +node @sub1 admin conninfo='dbname=$DB2'; +EOF + + +##### +# Remove old databases, if they exist +##### +echo "**** remove old test databases" +dropdb $DB1 || echo "**** ignored" +sleep 1 +dropdb $DB2 || echo "**** ignored" +sleep 1 + +##### +# Create the "Primary Node" +##### +echo "**** creating database for Node 11" + +createdb $DB1 || exit 1 +pgbench -i -s $PGBENCH_SCALE $DB1 +pg_dump -s $DB1 >pgbench_schema.sql + +##### +# Start pgbench in the background and give it rampup time +##### +pgbench -n -s $PGBENCH_SCALE -c $PGBENCH_CLIENTS -t $PGBENCH_TRANS $DB1 & +pgbench_pid=$! +echo "**** pgbench is running in background with pid $pgbench_pid" +echo -n "**** sleeping 10 seconds to give pgbench time for rampup ... " +sleep 10 +echo "done" + +echo "" +echo "**********************************************************************" +echo "**** $DB1 is now a standalone database with a running pgbench" +echo "**********************************************************************" +echo "" + +###################################################################### +# Setup DB1 as the primary cluster T1 node, start the node daemon, +# and create a replication set containing the pgbench tables. +###################################################################### + +echo "**** initializing $DB1 as Primary Node for Slony-I cluster $CLUSTERNAME" +slonik <<_EOF_ + include <$PREAMBLE_FILE>; + init cluster (id = @origin, comment = 'Node @origin'); + echo 'Database $DB1 initialized as Node 11'; +_EOF_ +if [ $? -ne 0 ] ; then + kill $pgbench_pid; + exit 1 +fi + +echo "**** starting the Slony-I node daemon for $DB1" +xterm -title "Slon node 11" -e sh -c "slon -d$DEBUGLEVEL -s500 -g10 $CLUSTERNAME dbname=$DB1; echo -n 'Enter>'; read line" & +slon1_pid=$! +echo "slon[$slon1_pid] on dbname=$DB1" + +echo "**** creating a replication set containing the 4 pgbench tables ... " +slonik <<_EOF_ + include <$PREAMBLE_FILE>; + try { + table add key (node id = @origin, fully qualified name = 'public.history'); + } + on error { + exit 1; + } + + try { + create set (id = 1, origin = @origin, comment = 'Set 1 - pgbench tables'); + set add table (set id = 1, origin = @origin, + id = 1, fully qualified name = 'public.accounts', + comment = 'Table accounts'); + set add table (set id = 1, origin = @origin, + id = 2, fully qualified name = 'public.branches', + comment = 'Table branches'); + set add table (set id = 1, origin = @origin, + id = 3, fully qualified name = 'public.tellers', + comment = 'Table tellers'); + set add table (set id = 1, origin = @origin, + id = 4, fully qualified name = 'public.history', + key = serial, comment = 'Table accounts'); + } + on error { + exit 1; + } +_EOF_ + +if [ $? -ne 0 ] ; then + echo "failed" + kill $pgbench_pid 2>/dev/null + kill $slon1_pid 2>/dev/null + cat $TMPOUT + rm $TMPOUT + exit 1 +fi +echo "**** set created" + +##### +# Check that pgbench is still running +##### +if ! kill -0 $pgbench_pid 2>/dev/null ; then + echo "**** pgbench terminated ???" + kill $slon1_pid 2>/dev/null + exit 1 +fi + +echo "" +echo "**********************************************************************" +echo "**** $DB1 is now the Slony-I origin for set 1" +echo "**********************************************************************" +echo "" + +###################################################################### +# Setup DB2 as a subscriber node and let it subscribe the replication +# set of the running pgbench +###################################################################### +echo "**** creating database for node 22" +if ! createdb $DB2 ; then + kill $pgbench_pid 2>/dev/null + kill $slon1_pid 2>/dev/null + exit 1 +fi + +echo "**** initializing $DB2 as node 22 of Slony-I cluster $CLUSTERNAME" +slonik <<_EOF_ + include <$PREAMBLE_FILE>; + echo 'Creating node 22'; + try { + store node (id = @sub1, comment = 'node @sub1', event node = @origin); + } on error { + echo 'could not establish node @sub1'; + exit -1; + } + try { + store path (server = @origin, client = @sub1, conninfo = 'dbname=$DB1'); + store path (server = @sub1, client = @origin, conninfo = 'dbname=$DB2'); + } + on error { + echo 'could not establish paths between @origin and @sub1'; + exit -1; + } + echo 'Database $DB2 added as node @sub1'; +_EOF_ +if [ $? -ne 0 ] ; then + kill $pgbench_pid 2>/dev/null + kill $slon1_pid 2>/dev/null + exit 1 +fi + +echo "**** starting the Slony-I node daemon for $DB1" +xterm -title "Slon node 22" -e sh -c "slon -d$DEBUGLEVEL -s10000 -o10000 -g10 -a $LOGSHIPDIR $CLUSTERNAME dbname=$DB2; echo -n 'Enter>'; read line" & +slon2_pid=$! +echo "slon[$slon2_pid] on dbname=$DB2" + +##### +# Check that pgbench is still running +##### +if ! kill -0 $pgbench_pid 2>/dev/null ; then + echo "**** pgbench terminated ???" + kill $slon1_pid 2>/dev/null + exit 1 +fi + +###################################################################### +# And now comes the moment where the big elephant starts to pee +# and the attendants in the first row climb on their chairs ... +###################################################################### +echo "**** creating pgbench tables and subscribing node 22 to set 1" +( + cat pgbench_schema.sql +) | psql -q $DB2 +slonik <<_EOF_ + include <$PREAMBLE_FILE>; + subscribe set ( id = 1, provider = @origin, receiver = @sub1, forward = yes ); +_EOF_ + +echo "" +echo "**********************************************************************" +echo "**** $DB2 should now be copying data and attempting to catch up." +echo "**********************************************************************" +echo "" + +echo -n "**** waiting for pgbench to finish " +while kill -0 $pgbench_pid 2>/dev/null ; do + echo -n "." + sleep 10 +done +echo "**** pgbench finished" +echo "**** please terminate the replication engines when caught up." +wait $slon1_pid +wait $slon2_pid + +kill $pgbench_pid 2>/dev/null +kill $slon1_pid 2>/dev/null +kill $slon2_pid 2>/dev/null + +echo -n "**** comparing databases ... " +psql $DB1 >dump.tmp.1.$$ <<_EOF_ + select 'accounts:'::text, aid, bid, abalance, filler + from accounts order by aid; + select 'branches:'::text, bid, bbalance, filler + from branches order by bid; + select 'tellers:'::text, tid, bid, tbalance, filler + from tellers order by tid; + select 'history:'::text, tid, bid, aid, delta, mtime, filler, + "_Slony-I_${CLUSTERNAME}_rowID" from history order by "_Slony-I_${CLUSTERNAME}_rowID"; +_EOF_ +psql $DB2 >dump.tmp.2.$$ <<_EOF_ + select 'accounts:'::text, aid, bid, abalance, filler + from accounts order by aid; + select 'branches:'::text, bid, bbalance, filler + from branches order by bid; + select 'tellers:'::text, tid, bid, tbalance, filler + from tellers order by tid; + select 'history:'::text, tid, bid, aid, delta, mtime, filler, + "_Slony-I_${CLUSTERNAME}_rowID" from history order by "_Slony-I_${CLUSTERNAME}_rowID"; +_EOF_ + +if diff dump.tmp.1.$$ dump.tmp.2.$$ >test_1.diff ; then + echo "success - databases are equal." + rm dump.tmp.?.$$ + rm test_1.diff +else + echo "FAILED - see test_1.diff for database differences" +fi +rm $PREAMBLE_FILE Index: remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.76 retrieving revision 1.77 diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.76 -r1.77 --- src/slon/remote_worker.c +++ src/slon/remote_worker.c @@ -249,7 +249,9 @@ static int generate_archive_header (int node_id, char *seqbuf); static int submit_query_to_archive(SlonDString *ds); static int submit_string_to_archive (const char *s); +static int submit_raw_data_to_archive (const char *s); static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf); +static int write_void_log (int node_id, char *seqbuf, const char *message); #define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive(); @@ -587,6 +589,9 @@ no_id, no_comment, no_spool); need_reloadListen = true; + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_NODE"); + } else if (strcmp(event->ev_type, "ENABLE_NODE") == 0) { @@ -601,6 +606,9 @@ no_id); need_reloadListen = true; + + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- ENABLE_NODE"); } else if (strcmp(event->ev_type, "DROP_NODE") == 0) { @@ -650,6 +658,8 @@ rtcfg_cluster_name); need_reloadListen = true; + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_NODE"); } else if (strcmp(event->ev_type, "STORE_PATH") == 0) { @@ -667,6 +677,8 @@ pa_server, pa_client, pa_conninfo, pa_connretry); need_reloadListen = true; + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_PATH"); } else if (strcmp(event->ev_type, "DROP_PATH") == 0) { @@ -682,6 +694,8 @@ pa_server, pa_client); need_reloadListen = true; + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_PATH"); } else if (strcmp(event->ev_type, "STORE_LISTEN") == 0) { @@ -696,6 +710,8 @@ "select %s.storeListen_int(%d, %d, %d); ", rtcfg_namespace, li_origin, li_provider, li_receiver); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_LISTEN"); } else if (strcmp(event->ev_type, "DROP_LISTEN") == 0) { @@ -710,6 +726,8 @@ "select %s.dropListen_int(%d, %d, %d); ", rtcfg_namespace, li_origin, li_provider, li_receiver); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_LISTEN"); } else if (strcmp(event->ev_type, "STORE_SET") == 0) { @@ -724,6 +742,9 @@ "select %s.storeSet_int(%d, %d, '%q'); ", rtcfg_namespace, set_id, set_origin, set_comment); + + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_SET"); } else if (strcmp(event->ev_type, "DROP_SET") == 0) { @@ -734,18 +755,46 @@ slon_appendquery(&query1, "select %s.dropSet_int(%d); ", rtcfg_namespace, set_id); + + /* The table deleted needs to be + * dropped from log shipping too */ + if (archive_dir) { + rc = open_log_archive(rtcfg_nodeid, seqbuf); + rc = generate_archive_header(rtcfg_nodeid, seqbuf); + slon_mkquery(&query1, + "delete from %s.sl_setsync_offline " + " where ssy_setid= %d;", + rtcfg_namespace, set_id); + rc = submit_query_to_archive(&query1); + rc = close_log_archive(); + } } else if (strcmp(event->ev_type, "MERGE_SET") == 0) { int set_id = (int)strtol(event->ev_data1, NULL, 10); int add_id = (int)strtol(event->ev_data2, NULL, 10); - + char dropquery[280]; rtcfg_dropSet(add_id); slon_appendquery(&query1, "select %s.mergeSet_int(%d, %d); ", rtcfg_namespace, set_id, add_id); + + /* Log shipping gets the change here + * that we need to delete the table + * being merged from the set being + * maintained. */ + if (archive_dir) { + rc = open_log_archive(rtcfg_nodeid, seqbuf); + rc = generate_archive_header(rtcfg_nodeid, seqbuf); + rc = slon_mkquery(&query1, + "delete from %s.sl_setsync_offline " + " where ssy_setid= %d;", + rtcfg_namespace, add_id); + rc = submit_query_to_archive(&query1); + rc = close_log_archive(); + } } else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0) { @@ -754,6 +803,8 @@ * subscribed sets yet and table information is not maintained * in the runtime configuration. */ + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE"); } else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0) { @@ -762,6 +813,8 @@ * subscribed sets yet and sequences information is not * maintained in the runtime configuration. */ + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE"); } else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0) { @@ -770,6 +823,8 @@ slon_appendquery(&query1, "select %s.setDropTable_int(%d);", rtcfg_namespace, tab_id); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE"); } else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0) { @@ -778,6 +833,8 @@ slon_appendquery(&query1, "select %s.setDropSequence_int(%d);", rtcfg_namespace, seq_id); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE"); } else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0) { @@ -787,6 +844,8 @@ slon_appendquery(&query1, "select %s.setMoveTable_int(%d, %d);", rtcfg_namespace, tab_id, new_set_id); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE"); } else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0) { @@ -796,6 +855,8 @@ slon_appendquery(&query1, "select %s.setMoveSequence_int(%d, %d);", rtcfg_namespace, seq_id, new_set_id); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE"); } else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0) { @@ -806,6 +867,8 @@ "select %s.storeTrigger_int(%d, '%q'); ", rtcfg_namespace, trig_tabid, trig_tgname); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER"); } else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0) { @@ -816,6 +879,70 @@ "select %s.dropTrigger_int(%d, '%q'); ", rtcfg_namespace, trig_tabid, trig_tgname); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER"); + } + else if (strcmp(event->ev_type, "ACCEPT_SET") == 0) + { + int set_id = (int) strtol(event->ev_data1, NULL, 10); + int old_origin = (int) strtol(event->ev_data2, NULL, 10); + int new_origin = (int) strtol(event->ev_data3, NULL, 10); + int seq_no = (int) strtol(event->ev_data4, NULL, 10); + PGresult *res; + + /* If we're a remote node, and haven't yet + * received the MOVE_SET event from the + * new origin, then we'll need to sleep a + * bit... This avoids a race condition + * where new SYNCs take place on the new + * origin, and are ignored on some + * subscribers (and their children) + * because the MOVE_SET wasn't yet + * received and processed */ + + if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin)) { + slon_mkquery(&query1, + "select 1 from %s.sl_event accept " + "where " + " accept.ev_type = 'ACCEPT_SET' and " + " accept.ev_origin = %d and " + " accept.ev_data1 = %d and " + " accept.ev_data2 = %d and " + " accept.ev_data3 = %d and " + " accept.ev_data4 = %d and " + " not exists " + " (select 1 from %s.sl_event move " + " where " + " accept.ev_origin = move.ev_data3 and " + " move.ev_type = 'MOVE_SET' and " + " move.ev_data1 = accept.ev_data1 and " + " move.ev_data2 = accept.ev_data2 and " + " move.ev_data3 = accept.ev_data3 and " + " move.ev_seqno = %d); ", + + rtcfg_namespace, + old_origin, set_id, old_origin, new_origin, seq_no, + rtcfg_namespace, seq_no); + res = PQexec(local_dbconn, dstring_data(&query1)); + while (PQntuples(res) > 0) { + int sleeptime = 15; + int sched_rc; + slon_log(SLON_WARN, "remoteWorkerThread_%d: " + "accept set: node has not yet received MOVE_SET event " + "for set %d old origin %d new origin - sleep %d seconds\n", + rtcfg_nodeid, set_id, old_origin, new_origin, sleeptime); + sched_rc = sched_msleep(node, sleeptime * 1000); + if (sched_rc != SCHED_STATUS_OK) { + event_ok = false; + break; + } else { + if (sleeptime < 60) + sleeptime *= 2; + } + res = PQexec(local_dbconn, dstring_data(&query1)); + } + } + } else if (strcmp(event->ev_type, "MOVE_SET") == 0) { @@ -831,6 +958,7 @@ * that, we need to execute it now and select the resulting * provider for us. */ + slon_appendquery(&query1, "select %s.moveSet_int(%d, %d, %d); ", rtcfg_namespace, @@ -880,6 +1008,8 @@ rtcfg_namespace, failed_node, backup_node, set_id); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- FAILOVER_SET"); need_reloadListen = true; } else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0) @@ -896,7 +1026,8 @@ "select %s.subscribeSet_int(%d, %d, %d, '%q'); ", rtcfg_namespace, sub_set, sub_provider, sub_receiver, sub_forward); - + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET"); need_reloadListen = true; } else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0) @@ -1004,7 +1135,9 @@ rtcfg_namespace, sub_set, sub_provider, sub_receiver); } - + /* Note: No need to do anything based + on archive_dir here; copy_set does + that nicely already. */ need_reloadListen = true; } else if (strcmp(event->ev_type, "UNSUBSCRIBE_SET") == 0) @@ -1022,6 +1155,16 @@ sub_set, sub_receiver); need_reloadListen = true; + if (archive_dir) { + rc = open_log_archive(rtcfg_nodeid, seqbuf); + rc = generate_archive_header(rtcfg_nodeid, seqbuf); + slon_mkquery(&query1, + "delete from %s.sl_setsync_offline " + " where ssy_setid= %d;", + rtcfg_namespace, sub_set); + rc = submit_query_to_archive(&query1); + rc = close_log_archive(); + } } else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0) { @@ -1087,11 +1230,15 @@ "select %s.updateReloid(%d, '%q', %d); ", rtcfg_namespace, reset_config_setid, reset_configonly_on_node); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- RESET_CONFIG"); } else { printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n", node->no_id, event->ev_origin, event->ev_seqno, event->ev_type); + if (archive_dir) + write_void_log (rtcfg_nodeid, seqbuf, "-- UNHANDLED EVENT!!!"); } /* @@ -2107,6 +2254,33 @@ dstring_init(&query1); sprintf(seqbuf, INT64_FORMAT, event->ev_seqno); + + /* Log Shipping Support begins... */ + /* - Open the log, put the header in + Isn't it convenient that seqbuf was just populated??? :-) + */ + if (archive_dir) { + rc = open_log_archive(rtcfg_nodeid, seqbuf); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not open COPY SET archive file %s - %s", + node->no_id, archive_tmp, strerror(errno)); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + rc = generate_archive_header(rtcfg_nodeid, seqbuf); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not generate COPY SET archive header %s - %s", + node->no_id, archive_tmp, strerror(errno)); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + } /* * Listen on the special relation telling what node daemon this connection * belongs to. @@ -2118,6 +2292,7 @@ { slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } @@ -2147,6 +2322,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } if (*(PQgetvalue(res1, 0, 0)) == 't') @@ -2157,6 +2333,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } PQclear(res1); @@ -2170,6 +2347,7 @@ { slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } } @@ -2199,6 +2377,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } ntuples1 = PQntuples(res1); @@ -2236,6 +2415,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -2259,6 +2439,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -2279,6 +2460,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: " @@ -2314,6 +2496,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } @@ -2334,6 +2517,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } ntuples2 = PQntuples(res2); @@ -2348,6 +2532,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } } @@ -2356,6 +2541,10 @@ /* * Begin a COPY from stdin for the table on the local DB */ + slon_log(SLON_DEBUG4, "remoteWorkerThread_%d: " + "Begin COPY of table %s\n", + node->no_id, tab_fqname); + slon_mkquery(&query1, "select %s.truncateTable('%s'); " "copy %s from stdin; ", @@ -2372,9 +2561,25 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } + if (archive_dir) { + slon_log(SLON_DEBUG4, "start log ship copy of %s\n", tab_fqname); + slon_mkquery(&query1, + "delete from %s;copy %s from stdin;", tab_fqname, tab_fqname); + rc = submit_query_to_archive(&query1); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_d: " + "Could not generate copy_set request for %s - %s", + node->no_id, tab_fqname, strerror(errno)); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + } /* * Begin a COPY to stdout for the table on the provider DB */ @@ -2398,6 +2603,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } @@ -2410,7 +2616,6 @@ int len = strlen(copydata); copysize += (int64) len; - if (PQputCopyData(loc_dbconn, copydata, len) != 1) { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " @@ -2426,8 +2631,30 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } + if (archive_dir) { + rc = fwrite(copydata, 1, len, archive_fp); + if (rc != len) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "PQputCopyData() - log shipping - %s", + node->no_id, strerror(errno)); +#ifdef SLON_MEMDEBUG + memset(copydata, 88, len); +#endif + PQfreemem(copydata); + PQputCopyEnd(loc_dbconn, "Slony-I: copy set operation"); + PQclear(res3); + PQclear(res2); + PQclear(res1); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + + } + } #ifdef SLON_MEMDEBUG memset(copydata, 88, len); #endif @@ -2444,6 +2671,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } @@ -2464,6 +2692,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } PQclear(res3); @@ -2480,6 +2709,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } res2 = PQgetResult(loc_dbconn); @@ -2493,8 +2723,12 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } + if (archive_dir) { + rc = submit_string_to_archive("\\."); + } #else /* ! HAVE_PQPUTCOPYDATA */ copydone = false; while (!copydone) @@ -2517,16 +2751,22 @@ case 0: PQputline(loc_dbconn, copybuf); PQputline(loc_dbconn, "\n"); + if (archive_dir) + submit_string_to_archive(copybuf); break; case 1: PQputline(loc_dbconn, copybuf); + if (archive_dir) + submit_raw_data_to_archive(copybuf); break; } } } PQputline(loc_dbconn, "\\.\n"); - + if (archive_dir) { + rc = submit_string_to_archive("\\\."); + } /* * End the COPY to stdout on the provider */ @@ -2541,6 +2781,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } PQclear(res3); @@ -2562,6 +2803,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } #endif /* HAVE_PQPUTCOPYDATA */ @@ -2580,8 +2822,13 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } + if (archive_dir) { + submit_query_to_archive(&query1); + } + gettimeofday(&tv_now, NULL); slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " "%.3f seconds to copy table %s\n", @@ -2616,6 +2863,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } ntuples1 = PQntuples(res1); @@ -2638,6 +2886,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } } @@ -2670,6 +2919,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } ntuples1 = PQntuples(res1); @@ -2683,6 +2933,8 @@ "set last_value of sequence %s (%s) to %s\n", node->no_id, seql_seqid, seq_fqname, seql_last_value); + + /* * sequence with ID 0 is a nodes rowid ... only remember in seqlog. */ @@ -2691,6 +2943,10 @@ slon_mkquery(&query1, "select \"pg_catalog\".setval('%q', '%s'); ", seq_fqname, seql_last_value); + + if (archive_dir) { + submit_query_to_archive(&query1); + } } else dstring_reset(&query1); @@ -2706,6 +2962,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } } @@ -2749,6 +3006,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } if (PQntuples(res1) != 1) @@ -2759,6 +3017,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } if (PQgetisnull(res1, 0, 0)) @@ -2805,6 +3064,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } if (PQntuples(res1) != 1) @@ -2815,6 +3075,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } ssy_seqno = PQgetvalue(res1, 0, 0); @@ -2859,6 +3120,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } ntuples1 = PQntuples(res2); @@ -2899,6 +3161,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } if (PQntuples(res1) != 1) @@ -2909,6 +3172,7 @@ PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } dstring_init(&ssy_action_list); @@ -2937,14 +3201,45 @@ { slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); + return -1; + } + if (archive_dir) { + slon_mkquery(&query1, + "insert into %s.sl_setsync_offline () " + "values ('%d', '%d');", + rtcfg_namespace, set_id, ssy_seqno); + rc = submit_query_to_archive(&query1); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + " could not insert to sl_setsync_offline", + node->no_id); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); return -1; } + } gettimeofday(&tv_now, NULL); slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " "%.3f seconds to build initial setsync status\n", node->no_id, TIMEVAL_DIFF(&tv_start2, &tv_now)); + if (archive_dir) { + rc = close_log_archive(); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + " could not close archive log %s - %s", + node->no_id, archive_tmp, strerror(errno)); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + } + + /* * Roll back the transaction we used on the provider and close the * database connection. @@ -2954,6 +3249,7 @@ { slon_disconnectdb(pro_conn); dstring_free(&query1); + terminate_log_archive(); return -1; } slon_disconnectdb(pro_conn); @@ -2966,11 +3262,9 @@ gettimeofday(&tv_now, NULL); slon_log(SLON_DEBUG1, "copy_set %d done in %.3f seconds\n", set_id, TIMEVAL_DIFF(&tv_start, &tv_now)); - return 0; } - static int sync_event(SlonNode * node, SlonConn * local_conn, WorkerGroupData * wd, SlonWorkMsg_event * event) @@ -4219,9 +4513,11 @@ int close_log_archive () { int rc; + if (archive_dir) { rc = fprintf(archive_fp, "\n------------------------------------------------------------------\n-- End Of Archive Log\n------------------------------------------------------------------\ncommit;\n"); rc = fclose(archive_fp); rc = rename(archive_tmp, archive_name); + } return rc; } @@ -4231,13 +4527,18 @@ } int submit_query_to_archive(SlonDString *ds) { - return fprintf(archive_fp, "%s\n", *ds->data); + return fprintf(archive_fp, "%s\n", ds->data); } int submit_string_to_archive (const char *s) { return fprintf(archive_fp, "%s\n", s); } +/* Raw form used for COPY where we don't want any extra cr/lf output */ +int submit_raw_data_to_archive (const char *s) { + return fprintf(archive_fp, "%s", s); +} + void terminate_log_archive () { if (archive_fp) { fclose(archive_fp); @@ -4248,17 +4549,20 @@ time_t now; now = time(NULL); return fprintf(archive_fp, - "-- Slony-I sync log\n" + "-- Slony-I log shipping archive\n" "-- Node %d, Event %s\n" "-- at... %s\n" "start transaction;\n", node_id, seqbuf, ctime(&now)); } -/* - * Local Variables: - * tab-width: 4 - * c-indent-level: 4 - * c-basic-offset: 4 - * End: - */ +/* write_void_log() writes out a "void" log consisting of the message + * which must either be a valid SQL query or a SQL comment. */ + +int write_void_log (int node_id, char *seqbuf, const char *message) { + open_log_archive(node_id, seqbuf); + generate_archive_header(node_id, seqbuf); + submit_string_to_archive(message); + close_log_archive(); +} + Index: slon.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v retrieving revision 1.45 retrieving revision 1.46 diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.45 -r1.46 --- src/slon/slon.h +++ src/slon/slon.h @@ -50,7 +50,7 @@ #define SLON_CLEANUP_SLEEP 600 /* sleep 10 minutes between */ /* cleanup calls */ -#define SLON_VACUUM_FREQUENCY 1 /* vacuum every 3rd cleanup */ +#define SLON_VACUUM_FREQUENCY 3 /* vacuum every 3rd cleanup */ typedef enum Index: cleanup_thread.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/cleanup_thread.c,v retrieving revision 1.19 retrieving revision 1.20 diff -Lsrc/slon/cleanup_thread.c -Lsrc/slon/cleanup_thread.c -u -w -r1.19 -r1.20 --- src/slon/cleanup_thread.c +++ src/slon/cleanup_thread.c @@ -32,7 +32,8 @@ * ---------- Global data ---------- */ int vac_frequency = SLON_VACUUM_FREQUENCY; - +static unsigned long earliest_xid = 0; +static unsigned long get_earliest_xid (PGconn *dbconn); /* * ---------- cleanupThread_main * @@ -55,6 +56,7 @@ int n , t; int vac_count = 0; + char *vacuum_action; slon_log(SLON_DEBUG1, "cleanupThread: thread starts\n"); @@ -166,28 +168,44 @@ */ if (vac_frequency != 0 && ++vac_count >= vac_frequency) { + unsigned long latest_xid; vac_count = 0; - + latest_xid = get_earliest_xid(dbconn); + if (earliest_xid != latest_xid) { + vacuum_action = "vacuum analyze"; + } else { + vacuum_action = "analyze"; + slon_log(SLON_DEBUG4, "cleanupThread: xid %d still active - analyze instead\n", + earliest_xid); + } + earliest_xid = latest_xid; /* * Build the query string for vacuuming replication runtime data * and event tables */ dstring_init(&query3); slon_mkquery(&query3, - "vacuum analyze %s.sl_event; " - "vacuum analyze %s.sl_confirm; " - "vacuum analyze %s.sl_setsync; " - "vacuum analyze %s.sl_log_1; " - "vacuum analyze %s.sl_log_2;" - "vacuum analyze %s.sl_seqlog;" - "vacuum analyze pg_catalog.pg_listener;", + "%s %s.sl_event; " + "%s %s.sl_confirm; " + "%s %s.sl_setsync; " + "%s %s.sl_log_1; " + "%s %s.sl_log_2;" + "%s %s.sl_seqlog;" + "%s pg_catalog.pg_listener;", + vacuum_action, rtcfg_namespace, + vacuum_action, rtcfg_namespace, + vacuum_action, rtcfg_namespace, + vacuum_action, rtcfg_namespace, + vacuum_action, rtcfg_namespace, - rtcfg_namespace); - + vacuum_action, + rtcfg_namespace, + vacuum_action + ); gettimeofday(&tv_start, NULL); res = PQexec(dbconn, dstring_data(&query3)); @@ -231,3 +249,41 @@ slon_log(SLON_DEBUG1, "cleanupThread: thread done\n"); pthread_exit(NULL); } + + +static unsigned long get_earliest_xid (PGconn *dbconn) { + unsigned long lo = 2147483647; + unsigned long minhi = -1; + unsigned long minlo = lo; + unsigned long xid; + long n,t; + PGresult *res; + SlonDString query1; + dstring_init(&query1); + slon_mkquery(&query1, "select transaction from pg_catalog.pg_locks where transaction is not null;"); + res = PQexec(dbconn, dstring_data(&query1)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + slon_log(SLON_FATAL, "cleanupThread: could not read locks from pg_locks!"); + PQclear(res); + slon_abort(); + return -1; + } else { + n = PQntuples(res); + for (t = 0; t < n; t++) { + xid = atoi(PQgetvalue(res, t, 0)); + printf ("xid: %d\n", xid); + if (xid > lo) { + if (xid < minlo) + minlo = xid; + } else { + if (xid < minhi) + minhi = xid; + } + } + } + printf("minhi: %d minlo: %d\n", minlo, minhi); + if ((minhi - lo) < minlo) + return minlo; + else + return minhi; +}
- Previous message: [Slony1-commit] By cbbrowne: Added scanner support for include and define Flex scanner
- Next message: [Slony1-commit] By darcyb: Add new conf option: sql_on_connection.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list