CVS User Account cvsuser
Wed Nov 10 18:17:46 PST 2004
Log Message:
-----------
Add in some notes on implementation strategy, as well as the
beginnings of a "slonspool" implemented in Perl

Modified Files:
--------------
    slony1-engine/src/slonspool:
        NOTES (r1.1 -> r1.2)

Added Files:
-----------
    slony1-engine/src/slonspool:
        slonspool (r1.1)

-------------- next part --------------
Index: NOTES
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonspool/NOTES,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/slonspool/NOTES -Lsrc/slonspool/NOTES -u -w -r1.1 -r1.2
--- src/slonspool/NOTES
+++ src/slonspool/NOTES
@@ -73,12 +73,26 @@
 
       SPOOLPATH
 
+      -p "/var/spool/slony1/something"
+
    b) Naming convention for the spool files, likely using a
       strftime()-conformant name string, also with the option of
       having it use the starting and/or ending SYNC ids.
 
+      -n "something"      
+
    c) There needs to be some sort of "subscribe" notion...
 
+      This is implemented _inside_ the spooler.
+
+      --> For each table, run pg_dump -s -t on the table
+
+      --> Then do a "copy from" on each table
+
+   d) How often to split between files???
+
+      --> Combo of time, number of syncs, size
+
 Q5: What should the logs consist of?
 
   -> Should they simply consist of the updates on the tables Slony-I
--- /dev/null
+++ src/slonspool/slonspool
@@ -0,0 +1,171 @@
+#!perl
+
+use Pg;
+use Getopt::Long;
+my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, $maxsize, $maxage, $node);
+my @SETS;
+my $dbh;
+process_options();
+connect_to_node();
+subscribe_to_node();
+
+while (1) {
+  listen_to_node();
+}
+
+sub subscribe_to_node {
+  open(SUBSCRIBE, ">$spoolpath/subscription.log");
+  foreach $set (@SETS) {
+
+    # Create namespaces
+    print SUBSCRIBE "-- Subscribing node $node to set $set on $host - $port/$database/$user\n";
+    my $sth = $dbh->exec("select distinct nspname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
+    while ( @row = $sth->fetchrow ) {
+      my ($namespace) = @row;
+      print SUBSCRIBE "create schema $namespace;\n";
+    }
+    close(SUBSCRIBE);
+
+    # Create tables
+    $sth = $dbh->exec("select nspname || '.' || relname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
+    while ( @row = $sth->fetchrow ) {
+      my ($table) = @row;
+      `$pgbins/pg_dump -p $port -h $host -U $user -t $table -s $database >> $spoolpath/subscription.log`;
+    }
+    open(SUBSCRIBE, ">>$spoolpath/subscription.log");
+
+    # Pull data, as in copy_set (remote_worker.c)
+    my $query = "start transaction; set transaction isolation level serializable; ";
+    $sth = $dbh->exec($query);
+    my $tquery = qq{
+		    select T.tab_id, 
+		    "pg_catalog".quote_ident(PGN.nspname) || '.' || 
+		    "pg_catalog".quote_ident(PGC.relname) as tab_fqname, 
+		    T.tab_idxname, T.tab_comment
+		    from "$cluster".sl_table T,
+		    "pg_catalog".pg_class PGC,
+		    "pg_catalog".pg_namespace PGN 
+		    where T.tab_set = $set
+		    and T.tab_reloid = PGC.oid 
+		    and PGC.relnamespace = PGN.oid 
+		    order by tab_id; 
+		   };
+    
+    
+  }
+  $sth=$dbh->exec($tquery);
+  while (@row=$sth->fetchrow) {
+    my ($table) = @row;
+    print SUBSCRIBE qq{copy "$table" from stdin;\n};
+    my $query = qq{copy "$table" to stdout;};
+    $res = $dbh->exec($query);
+    my $line = "*" x 16384;
+    $ret = $dbh->getline($line, 16384);
+    while ($line ne "\\.") {
+      print SUBSCRIBE, line, "\n";
+      $ret = $dbh->getline($line, 16384);
+    }
+    print SUBSCRIBE "\.\n";
+  }
+  close SUBSCRIBE;
+  my $seqquery = qq{
+    select n.nspname, c.relname 
+    from "pg_catalog".pg_class c, "pg_catalog".pg_namespace, "$cluster".sl_sequence s
+    where
+      n.oid = c.relnamespace and 
+      c.oid = s.seq_reloid and
+      s.seq_set = $set;};
+  $sth=$dbh->exec($seqquery);
+  while (my @row=$sth->fetchrow) {
+    my ($nsp, $seqname) = @row;
+    `$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`;
+  }
+  
+  # Next, populate Sync information
+  # Use the last SYNC's snapshot information and set
+  # the action sequence list to all actions after
+  # that.
+
+  my $squery = qq{
+    select ssy_seqno, ssy_minxid, ssy_maxxid, 
+           ssy_xip, ssy_action_list 
+    from "$cluster".sl_setsync 
+    where ssy_setid = $set; };
+  $sth=$dbh->exec($squery);
+  while (my @row=$sth->fetchrow) {
+    my ($seqno, $minxid, $maxxid, $xip, $actionlist) = @row;
+  }
+  my $createsync = qq{
+    insert into "_$cluster".sl_setsync 
+	   (ssy_setid, ssy_origin, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, ssy_action_list)
+    values ($set, $node, $seqno, $minxid, $maxxid, '$xip', '$actionlist');};
+  print SUBSCRIBE $createsync, "\n";
+}
+
+sub listen_to_node {
+  
+}
+
+sub connect_to_node {
+  my $dsn = "dbname=$database host=$host port=$port user=$user";
+  if ($password) {
+    $dsn .= " password=$password";
+  }
+  $dbh = Pg::connectdb($dsn);
+}
+
+sub process_options { 
+
+  $goodopts = GetOptions("help", "database=s", "host=s", "user=s", 
+			 "cluster=s", "password=s", "port=s", "sets=s",
+			 "spoolpath=s", "spoolname=s", "pgbins=s", 
+			 "maxsize=i", "maxage=i", "node=i");
+
+  if (defined ($opt_help)) {
+    show_usage();
+  }
+
+  $cluster=$opt_cluster if (defined($opt_cluster));
+  $node = $opt_node if (defined($opt_node));
+  $database=$opt_database if (defined ($opt_database));
+  $user = $opt_user if (defined ($opt_user));
+  $host = $opt_host if (defined($opt_host));
+  $password = $opt_password if (defined($opt_password));
+  $port = $opt_port if (defined($opt_port));
+  $pgbins = $opt_pgbins if (defined($opt_pgbins));
+  $spoolpath = $opt_spoolpath if (defined($opt_spoolpath));
+  $spoolname   = $opt_spoolname   if (defined($opt_spoolname));
+  if (defined($opt_sets)) {
+    @SETS=split (/,/, $opt_sets);
+  }
+  if (defined($opt_maxsize)){
+    $maxsize = $opt_maxsize;
+  } else {
+    $maxsize = 10000;
+  } 
+  if (defined($opt_maxage)){
+    $maxsize = $opt_maxage;
+  } else {
+    $maxage = 300;
+  }
+}
+
+sub show_usage {
+  print qq{slonspool:
+     --help                get help
+     --cluster=s           Slony-I cluster name
+     --node=i              Node number to use to request 
+     --pgbins=s            Location of PostgreSQL binaries including slonik and pg_dump
+     --database=s          database to connect to
+     --host=s              host for database
+     --user=s              user for database
+     --password=s          password for database (you should probably use .pgpass instead)
+     --port=i              port number to connect to
+     --sets=s              Sets to replicate (comma-delimited) - e.g --sets=1,2,4 
+     --spoolpath=s         directory in which to spool output
+     --spoolname=s         naming convention for spoolfiles
+     --maxsize=i           maximum size of spool files, in kB - default =10000KB
+     --maxage=i            maximum age of spool files in seconds - default 300
+};
+  die -1;
+}


More information about the Slony1-commit mailing list