Tue Nov 30 23:38:52 PST 2004
- Previous message: [Slony1-commit] By cbbrowne: Added in some code to process syncs; still very rudimentary
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
The ongoing effort to understand log spooling...
Added Files:
-----------
slony1-engine/src/slonspool:
gen_output.pm (r1.1)
init.pm (r1.1)
slonspool.pl (r1.1)
subscribe.pm (r1.1)
Removed Files:
-------------
slony1-engine/src/slonspool:
slonspool
-------------- next part --------------
--- /dev/null
+++ src/slonspool/subscribe.pm
@@ -0,0 +1,92 @@
+#!/usr/bin/perl
+# $Id: subscribe.pm,v 1.1 2004/11/30 23:38:41 cbbrowne Exp $
+# Author: Christopher Browne
+# Copyright 2004 Afilias Canada
+
+sub subscribe_to_node {
+ open(SUBSCRIBE, ">$spoolpath/subscription.log");
+ foreach $set (@SETS) {
+
+ # Create namespaces
+ print SUBSCRIBE "-- Subscribing node $node to set $set on $host - $port/$database/$user\n";
+ my $sth = $dbh->exec("select distinct nspname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
+ while ( @row = $sth->fetchrow ) {
+ my ($namespace) = @row;
+ print SUBSCRIBE "create schema $namespace;\n";
+ }
+ close(SUBSCRIBE);
+
+ # Create tables
+ $sth = $dbh->exec("select nspname || '.' || relname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
+ while ( @row = $sth->fetchrow ) {
+ my ($table) = @row;
+ `$pgbins/pg_dump -p $port -h $host -U $user -t $table -s $database >> $spoolpath/subscription.log`;
+ }
+ open(SUBSCRIBE, ">>$spoolpath/subscription.log");
+
+ # Pull data, as in copy_set (remote_worker.c)
+ my $query = "begin transaction; set transaction isolation level serializable;";
+ $sth = $dbh->exec($query);
+ my $tquery = qq{
+ select T.tab_id,
+ "pg_catalog".quote_ident(PGN.nspname) || '.' ||
+ "pg_catalog".quote_ident(PGC.relname) as tab_fqname,
+ T.tab_idxname, T.tab_comment
+ from "$cluster".sl_table T,
+ "pg_catalog".pg_class PGC,
+ "pg_catalog".pg_namespace PGN
+ where T.tab_set = $set
+ and T.tab_reloid = PGC.oid
+ and PGC.relnamespace = PGN.oid
+ order by tab_id;
+ };
+ }
+ $sth=$dbh->exec($tquery);
+ while (@row=$sth->fetchrow) {
+ my ($table) = @row;
+ print SUBSCRIBE qq{copy "$table" from stdin;\n};
+ my $query = qq{copy "$table" to stdout;};
+ $res = $dbh->exec($query);
+ my $line = "*" x 16384;
+ $ret = $dbh->getline($line, 16384);
+ while ($line ne "\\.") {
+ print SUBSCRIBE line, "\n";
+ $ret = $dbh->getline($line, 16384);
+ }
+ print SUBSCRIBE "\.\n";
+ }
+ close SUBSCRIBE;
+ my $seqquery = qq{
+ select n.nspname, c.relname
+ from "pg_catalog".pg_class c, "pg_catalog".pg_namespace, "$cluster".sl_sequence s
+ where
+ n.oid = c.relnamespace and
+ c.oid = s.seq_reloid and
+ s.seq_set = $set;};
+ $sth=$dbh->exec($seqquery);
+ while (my @row=$sth->fetchrow) {
+ my ($nsp, $seqname) = @row;
+ `$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`;
+ }
+ # Next, populate Sync information
+ # Use the last SYNC's snapshot information and set
+ # the action sequence list to all actions after
+ # that.
+
+ my $squery = qq{
+ select ssy_seqno, ssy_minxid, ssy_maxxid,
+ ssy_xip, ssy_action_list
+ from "$cluster".sl_setsync
+ where ssy_setid = $set; };
+ $sth=$dbh->exec($squery);
+ while (my @row=$sth->fetchrow) {
+ my ($seqno, $minxid, $maxxid, $xip, $actionlist) = @row;
+ }
+ my $createsync = qq{
+ insert into "_$cluster".sl_setsync
+ (ssy_setid, ssy_origin, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, ssy_action_list)
+ values ($set, $node, $seqno, $minxid, $maxxid, '$xip', '$actionlist');};
+ print SUBSCRIBE $createsync, "\n";
+}
+
+1;
--- /dev/null
+++ src/slonspool/slonspool.pl
@@ -0,0 +1,185 @@
+#!/usr/bin/perl
+# $Id: slonspool.pl,v 1.1 2004/11/30 23:38:41 cbbrowne Exp $
+# Author: Christopher Browne
+# Copyright 2004 Afilias Canada
+
+use Pg;
+use Getopt::Long;
+require "subscribe.pm";
+require "gen_output.pm";
+require "init.pm";
+
+my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname,
+ $maxsize, $maxage, $subnode, $node);
+
+my @SETS;
+my $dbh;
+process_options();
+initialize_configuration();
+#subscribe_to_node();
+
+while (1) {
+ listen_to_node();
+}
+
+sub listen_to_node {
+ while (1) {
+ process_event();
+ die -1;
+ }
+}
+
+sub process_event {
+ my $dsn = "dbname=$database host=$host port=$port user=$user";
+ if ($password) {
+ $dsn .= " password=$password";
+ }
+ print "DSN: $dsn\n";
+ my $dbh = Pg::connectdb($dsn);
+ print "Last err:", $dbh->errorMessage, "\n";
+ my $sync_event;
+ my $last_seq = qq{select con_seqno from "_$cluster".sl_confirm
+ where con_origin = $node order by con_seqno desc limit 1;};
+ print $last_seq, "\n";
+ my $res = $dbh->exec($last_seq);
+ while (my @row = $res->fetchrow) {
+ ($sync_event) = @row;
+ print "Last sync: $sync_event\n";
+ }
+ print "Last err:", $dbh->errorMessage, "\n";
+ $sync_event++;
+ print "Next sync: $sync_event\n";
+
+ my @ORIGINS;
+ my $origin_query = qq{ select set_origin from "_$cluster".sl_set where set_id in ($opt_sets); };
+ $res = $dbh->exec($origin_query);
+ while (my @row = $res->fetchrow) {
+ my ($origin) = @row;
+ push @ORIGINS, $origin;
+ }
+ $origin_qualification = " (log_origin in (" . join(',', @ORIGINS) . ")) ";
+
+
+ my $table_qualification = " (log_tableid in (" . join(',', @TABLES) . ")) ";
+
+ print "Table qualification: $table_qualification\n";
+ my $qualification .= " $origin_qualification and $table_qualification ";
+
+ my $cursor_query = qq{
+ declare LOG cursor for
+ select log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata
+ from "_$cluster".sl_log_1
+ where $origin_qualification and $table_qualification
+ order by log_xid, log_actionseq;};
+
+ print "Cursor query: $cursor_query\n";
+
+ my $lastxid = "";
+ my $syncname=sprintf("log-%08d", $sync);
+ open(LOGOUTPUT, ">$spoolpath/$syncname");
+ print LOGOUTPUT "-- Data for sync $sync_event\n";
+ print LOGOUTPUT "-- ", `date`;
+ my $begin = $dbh->exec("begin;");
+ my $cursorexec = $dbh->exec($cursor_query);
+ print "Last err:", $dbh->errorMessage, "\n";
+ my $foundsome = "YES";
+ while ($foundsome eq "YES") {
+ $foundsome = "NO";
+ my $res = $dbh->exec("fetch forward 100 in LOG;");
+ while (my @row = $res->fetchrow) {
+ $foundsome = "YES";
+ my ($origin, $xid, $tableid, $actionseq, $cmdtype, $cmddata) = @row;
+ if ($xid ne $lastxid) { # changed xid - report that...
+ if ($lastxid ne "") { # Do nothing first time around...
+ printf LOGOUTPUT "COMMIT; -- Done xid $lastxid\n";
+ }
+ print LOGOUTPUT "BEGIN;\nselect fail_if_xid_applied($xid);\n";
+ $lastxid = $xid;
+ }
+ if ($cmdtype eq "I") {
+ printf LOGOUTPUT "insert into %s %s;\n", $TABLENAME[$tableid], $cmddata;
+ } elsif ($cmdtype eq "U") {
+ printf LOGOUTPUT "update only %s set %s;\n", $TABLENAME[$tableid], $cmddata;
+ } elsif ($cmdtype eq "D") {
+ printf LOGOUTPUT "delete from only %s where %s;\n", $TABLENAME[$tableid], $cmddata;
+ } else {
+ print LOGOUTPUT "problem: cmddata not in (I,U,D) = [$cmdtype]\n";
+ }
+ }
+ }
+ if ($lastxid ne "") {
+ print LOGOUTPUT "COMMIT; -- Done xid $lastxid\n";
+ }
+ close LOGOUTPUT;
+ $dbh->exec("rollback;");
+ my $confirmation = qq{ insert into "_$cluster".sl_confirm (con_origin,con_received,con_seqno,con_timestamp)
+ values ($node, $subnode, $sync_event, CURRENT_TIMESTAMP); };
+ print "Confirm: $confirmation\n";
+ my $cursorexec = $dbh->exec($confirmation);
+}
+
+sub connect_to_node {
+ my $dsn = "dbname=$database host=$host port=$port user=$user";
+ if ($password) {
+ $dsn .= " password=$password";
+ }
+ $dbh = Pg::connectdb($dsn);
+}
+
+sub process_options {
+
+ $goodopts = GetOptions("help", "database=s", "host=s", "user=s",
+ "cluster=s", "password=s", "port=s", "sets=s",
+ "spoolpath=s", "spoolname=s", "pgbins=s",
+ "maxsize=i", "maxage=i", "node=i", "subnode=i");
+
+ if (defined ($opt_help)) {
+ show_usage();
+ }
+
+ $cluster=$opt_cluster if (defined($opt_cluster));
+ $subnode = $opt_subnode if (defined ($opt_subnode));
+ $node = $opt_node if (defined($opt_node));
+ $database=$opt_database if (defined ($opt_database));
+ $user = $opt_user if (defined ($opt_user));
+ $host = $opt_host if (defined($opt_host));
+ $password = $opt_password if (defined($opt_password));
+ $port = $opt_port if (defined($opt_port));
+ $pgbins = $opt_pgbins if (defined($opt_pgbins));
+ $spoolpath = $opt_spoolpath if (defined($opt_spoolpath));
+ $spoolname = $opt_spoolname if (defined($opt_spoolname));
+ if (defined($opt_sets)) {
+ @SETS=split (/,/, $opt_sets);
+ }
+ if (defined($opt_maxsize)){
+ $maxsize = $opt_maxsize;
+ } else {
+ $maxsize = 10000;
+ }
+ if (defined($opt_maxage)){
+ $maxsize = $opt_maxage;
+ } else {
+ $maxage = 300;
+ }
+}
+
+sub show_usage {
+ print qq{slonspool:
+ --help get help
+ --cluster=s Slony-I cluster name
+ --subnode=s Node number subscribed through
+ --node=i Node number to use to request
+ --pgbins=s Location of PostgreSQL binaries including slonik and pg_dump
+ --database=s database to connect to
+ --host=s host for database
+ --user=s user for database
+ --password=s password for database (you should probably use .pgpass instead)
+ --port=i port number to connect to
+ --sets=s Sets to replicate (comma-delimited) - e.g --sets=1,2,4
+ --spoolpath=s directory in which to spool output
+ --spoolname=s naming convention for spoolfiles
+ --maxsize=i maximum size of spool files, in kB - default =10000KB
+ --maxage=i maximum age of spool files in seconds - default 300
+};
+ die -1;
+}
--- /dev/null
+++ src/slonspool/gen_output.pm
@@ -0,0 +1,3 @@
+#!/usr/bin/perl
+
+1;
--- src/slonspool/slonspool
+++ /dev/null
@@ -1,263 +0,0 @@
-#!perl
-
-use Pg;
-use Getopt::Long;
-my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, $maxsize, $maxage, $subnode, $node);
-my @SETS;
-my $dbh;
-process_options();
-connect_to_node();
-subscribe_to_node();
-
-while (1) {
- listen_to_node();
-}
-
-sub subscribe_to_node {
- open(SUBSCRIBE, ">$spoolpath/subscription.log");
- foreach $set (@SETS) {
-
- # Create namespaces
- print SUBSCRIBE "-- Subscribing node $node to set $set on $host - $port/$database/$user\n";
- my $sth = $dbh->exec("select distinct nspname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
- while ( @row = $sth->fetchrow ) {
- my ($namespace) = @row;
- print SUBSCRIBE "create schema $namespace;\n";
- }
- close(SUBSCRIBE);
-
- # Create tables
- $sth = $dbh->exec("select nspname || '.' || relname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
- while ( @row = $sth->fetchrow ) {
- my ($table) = @row;
- `$pgbins/pg_dump -p $port -h $host -U $user -t $table -s $database >> $spoolpath/subscription.log`;
- }
- open(SUBSCRIBE, ">>$spoolpath/subscription.log");
-
- # Pull data, as in copy_set (remote_worker.c)
- my $query = "start transaction; set transaction isolation level serializable; ";
- $sth = $dbh->exec($query);
- my $tquery = qq{
- select T.tab_id,
- "pg_catalog".quote_ident(PGN.nspname) || '.' ||
- "pg_catalog".quote_ident(PGC.relname) as tab_fqname,
- T.tab_idxname, T.tab_comment
- from "$cluster".sl_table T,
- "pg_catalog".pg_class PGC,
- "pg_catalog".pg_namespace PGN
- where T.tab_set = $set
- and T.tab_reloid = PGC.oid
- and PGC.relnamespace = PGN.oid
- order by tab_id;
- };
- }
- $sth=$dbh->exec($tquery);
- while (@row=$sth->fetchrow) {
- my ($table) = @row;
- print SUBSCRIBE qq{copy "$table" from stdin;\n};
- my $query = qq{copy "$table" to stdout;};
- $res = $dbh->exec($query);
- my $line = "*" x 16384;
- $ret = $dbh->getline($line, 16384);
- while ($line ne "\\.") {
- print SUBSCRIBE line, "\n";
- $ret = $dbh->getline($line, 16384);
- }
- print SUBSCRIBE "\.\n";
- }
- close SUBSCRIBE;
- my $seqquery = qq{
- select n.nspname, c.relname
- from "pg_catalog".pg_class c, "pg_catalog".pg_namespace, "$cluster".sl_sequence s
- where
- n.oid = c.relnamespace and
- c.oid = s.seq_reloid and
- s.seq_set = $set;};
- $sth=$dbh->exec($seqquery);
- while (my @row=$sth->fetchrow) {
- my ($nsp, $seqname) = @row;
- `$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`;
- }
- # Next, populate Sync information
- # Use the last SYNC's snapshot information and set
- # the action sequence list to all actions after
- # that.
-
- my $squery = qq{
- select ssy_seqno, ssy_minxid, ssy_maxxid,
- ssy_xip, ssy_action_list
- from "$cluster".sl_setsync
- where ssy_setid = $set; };
- $sth=$dbh->exec($squery);
- while (my @row=$sth->fetchrow) {
- my ($seqno, $minxid, $maxxid, $xip, $actionlist) = @row;
- }
- my $createsync = qq{
- insert into "_$cluster".sl_setsync
- (ssy_setid, ssy_origin, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, ssy_action_list)
- values ($set, $node, $seqno, $minxid, $maxxid, '$xip', '$actionlist');};
- print SUBSCRIBE $createsync, "\n";
-}
-
-sub listen_to_node {
- while (1) {
- process_event();
- sleep 2;
- }
-}
-
-sub process_event {
- my $dsn = "dbname=$database host=$host port=$port user=$user";
- if ($password) {
- $dsn .= " password=$password";
- }
- my $dbh = Pg::connectdb($dsn);
- print "Result:", $dbh->status, " OK=", PGRES_CONNECTION_OK, "\n";
- my $sync_event;
- my $last_seq = qq{select con_seqno from "_$cluster".sl_confirm
- where con_origin = $node order by con_seqno desc limit 1;};
- print $last_seq, "\n";
- my $res = $dbh->exec($last_seq);
- while (my @row = $res->fetchrow) {
- ($sync_event) = @row;
- print "Last sync: $sync_event\n";
- }
- $sync_event++;
- print "Next sync: $sync_event\n";
-
- my $get_tables = qq{ select tab_id from "_$cluster".sl_table where tab_set in ($sets); };
- my $origin_qualification = " (log_origin = $sub_node) ";
- my $table_qualification = " ( log_tableid in (";
- my $res = $dbh->exec($get_tables);
- my @TABLES;
- while (my @row=$sth->fetchrow) {
- my ($table_id) = @row;
- push @TABLES, $table_id;
- }
- $table_qualification .= join(',', @TABLES);
- $table_qualification .= "))";
- my $qualification .= " $origin_qualification and $table_qualification ";
- my $get_event_info = qq{select ev_minxid, ev_maxxid, ev_xip from "_$cluster".sl_event where ev_seqno = $sync_event;};
- my $res = $dbh->exec($get_event_info);
- while (my @row=$res->fetchrow) {
- my ($minxid, $maxxid, $ev_zip) = @row;
- if ($ev_zip) {
- $ev_zip = s/'//g; # Strip off unnecessary quotes
- $qualification .= "and ($log_xid < '$maxxid' and \"_$cluster\".xxid_lt_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))";
- $qualification .= "and ($log_xid >= '$minxid' and \"_$cluster\".xxid_ge_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))";
- } else {
- $qualification .= "and ($log_xid < '$maxxid') ";
- $qualification .= "and ($log_xid >= '$minxid') ";
- }
- }
-
- my $tables_query = qq{t.tab_id, t.tab_reloid, n.nspname, r.relname from "_$cluster".sl_table t, pg_catalog.pg_namespace n, pg_catalog.pg_class r where r.oid = t.tab_reloid and n.oid = r.relnamespace;};
- $res = $dbh->exec($tables_query);
- while (my @row = $res->fetchrow) {
- my ($id, $oid, $namespace, $tname) = @row;
- $TABLENAME[$i] = qq{"$namespace".$tname};
- }
-
- my $cursor_query = qq{
- declare LOG cursor for
- select log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata
- from "_$cluster".sl_log_1
- where $qualification
- order by log_actionseq;};
-
-
- my $syncname=sprintf("log-%8d", $sync_event);
- open(LOGOUTPUT, ">$spoolpath/$syncname");
- print LOGOUTPUT "-- Data for sync $sync_event\n";
- print LOGOUTPUT "-- ", `date`;
- print LOGOUTPUT "BEGIN;\n";
- my $begin = $dbh->exec("begin;");
- my $cursorexec = $dbh->exec($cursor_query);
- my $foundsome = "YES";
- while ($foundsome eq "YES") {
- $foundsome = "NO";
- my $res = $dbh->exec("fetch forward 100 in LOG;");
- while (my @row = $res->fetchrow) {
- $foundsome = "YES";
- my ($origin, $xid, $tableid, $actionseq, $cmdtype, $cmddata) = @row;
- if ($cmddata eq "I") {
- printf LOGOUTPUT "insert into %s %s;\n", $TABLENAME[$tableid], $cmddata;
- } elsif ($cmddata eq "U") {
- printf LOGOUTPUT "update only %s set %s;\n", $TABLENAME[$tableid], $cmddata;
- } elsif ($cmddata eq "D") {
- printf LOGOUTPUT "delete from only %s where %s;\n", $TABLENAME[$tableid], $cmddata;
- }
- }
- }
- close LOGOUTPUT;
- my $confirmation = qq{ insert into "_$cluster".sl_confirm (con_origin,con_received,con_seqno,con_timestamp)
- values ($node, $subnode, $sync_event, CURRENT_TIMESTAMP); };
- print "Confirm: $confirmation\n";
- my $cursorexec = $dbh->exec($confirmation);
-}
-
-sub connect_to_node {
- my $dsn = "dbname=$database host=$host port=$port user=$user";
- if ($password) {
- $dsn .= " password=$password";
- }
- $dbh = Pg::connectdb($dsn);
-}
-
-sub process_options {
-
- $goodopts = GetOptions("help", "database=s", "host=s", "user=s",
- "cluster=s", "password=s", "port=s", "sets=s",
- "spoolpath=s", "spoolname=s", "pgbins=s",
- "maxsize=i", "maxage=i", "node=i", "subnode=i");
-
- if (defined ($opt_help)) {
- show_usage();
- }
-
- $cluster=$opt_cluster if (defined($opt_cluster));
- $subnode = $opt_subnode if (defined ($opt_subnode));
- $node = $opt_node if (defined($opt_node));
- $database=$opt_database if (defined ($opt_database));
- $user = $opt_user if (defined ($opt_user));
- $host = $opt_host if (defined($opt_host));
- $password = $opt_password if (defined($opt_password));
- $port = $opt_port if (defined($opt_port));
- $pgbins = $opt_pgbins if (defined($opt_pgbins));
- $spoolpath = $opt_spoolpath if (defined($opt_spoolpath));
- $spoolname = $opt_spoolname if (defined($opt_spoolname));
- if (defined($opt_sets)) {
- @SETS=split (/,/, $opt_sets);
- }
- if (defined($opt_maxsize)){
- $maxsize = $opt_maxsize;
- } else {
- $maxsize = 10000;
- }
- if (defined($opt_maxage)){
- $maxsize = $opt_maxage;
- } else {
- $maxage = 300;
- }
-}
-
-sub show_usage {
- print qq{slonspool:
- --help get help
- --cluster=s Slony-I cluster name
- --subnode=s Node number subscribed through
- --node=i Node number to use to request
- --pgbins=s Location of PostgreSQL binaries including slonik and pg_dump
- --database=s database to connect to
- --host=s host for database
- --user=s user for database
- --password=s password for database (you should probably use .pgpass instead)
- --port=i port number to connect to
- --sets=s Sets to replicate (comma-delimited) - e.g --sets=1,2,4
- --spoolpath=s directory in which to spool output
- --spoolname=s naming convention for spoolfiles
- --maxsize=i maximum size of spool files, in kB - default =10000KB
- --maxage=i maximum age of spool files in seconds - default 300
-};
- die -1;
-}
--- /dev/null
+++ src/slonspool/init.pm
@@ -0,0 +1,61 @@
+#!/usr/bin/perl
+# $Id: init.pm,v 1.1 2004/11/30 23:38:40 cbbrowne Exp $
+
+# Data structures...
+# %NODES{}{}
+# Fields:
+# $NODE{$i}{last_event} - Last event processed for node
+#
+# %SET{}{}
+# Fields:
+# $SET{$i}{origin} - origin node
+# $SET{$i}{comment} - Comment about set
+# $SET{$i}{provider} - node that provides data to our favorite node
+#
+# %TABLES
+# $TABLES{$i}{name}
+# $TABLES{$i}{namespace}
+# $TABLES{$i}{set}
+
+# Populate latest information about subscription providers and such...
+sub load_configuration {
+ my $dsn = "dbname=$database host=$host port=$port user=$user";
+ if ($password) {
+ $dsn .= " password=$password";
+ }
+ $dbh = Pg::connectdb($dsn);
+
+ # Populate %NODE with confirmation information
+ my $confirm_query = qq{ select con_origin, con_seqno from "_$cluster".sl_confirm where received = $node; };
+ my $res = $dbh->exec($confirm_query);
+ while (my @row = $res->fetchrow) {
+ my ($origin, $sync) = @row;
+ if ($NODE{$origin}{last_event} < $sync) {
+ $NODE{$origin}{last_event} = $sync;
+ }
+ }
+
+ # Populate %SET with set info for the sets being handled
+ my $sub_set_query = qq{ select set_id, set_origin from "_$cluster".sl_set where set_id in ($opt_sets);};
+ my $res = $dbh->exec($confirm_query);
+ while (my @row = $res->fetchrow) {
+ my ($set, $origin) = @row;
+ $SET{$set}{origin} = $origin;
+ }
+
+ my $tables_query = qq{select t.tab_id, t.tab_set, n.nspname, r.relname from "_$cluster".sl_table t, pg_catalog.pg_namespace n, pg_catalog.pg_class r where r.oid = t.tab_reloid and n.oid = r.relnamespace and tab_set in ($opt_sets) ;};
+ $res = $dbh->exec($tables_query);
+ while (my @row = $res->fetchrow) {
+ my ($id, $set, $namespace, $tname) = @row;
+ $TABLES{$id}{name} = $tname;
+ $TABLES{$id}{namespace} = $namespace;
+ $TABLES{$id}{set} = $set;
+ }
+}
+
+sub storeNode {
+ my ($id, $comment) = @_;
+ $NODES[$id] = $comment;
+}
+
+1;
- Previous message: [Slony1-commit] By cbbrowne: Added in some code to process syncs; still very rudimentary
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list