Wed Nov 10 18:17:46 PST 2004
- Previous message: [Slony1-commit] By cbbrowne: Add in "draft" version of RebuildListenEntries() that
- Next message: [Slony1-commit] By cbbrowne: Add a list of features to be worked on
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Add in some notes on implementation strategy, as well as the
beginnings of a "slonspool" implemented in Perl
Modified Files:
--------------
slony1-engine/src/slonspool:
NOTES (r1.1 -> r1.2)
Added Files:
-----------
slony1-engine/src/slonspool:
slonspool (r1.1)
-------------- next part --------------
Index: NOTES
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonspool/NOTES,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/slonspool/NOTES -Lsrc/slonspool/NOTES -u -w -r1.1 -r1.2
--- src/slonspool/NOTES
+++ src/slonspool/NOTES
@@ -73,12 +73,26 @@
SPOOLPATH
+ -p "/var/spool/slony1/something"
+
b) Naming convention for the spool files, likely using a
strftime()-conformant name string, also with the option of
having it use the starting and/or ending SYNC ids.
+ -n "something"
+
c) There needs to be some sort of "subscribe" notion...
+ This is implemented _inside_ the spooler.
+
+ --> For each table, run pg_dump -s -t on the table
+
+ --> Then do a "copy from" on each table
+
+ d) How often to split between files???
+
+ --> Combo of time, number of syncs, size
+
Q5: What should the logs consist of?
-> Should they simply consist of the updates on the tables Slony-I
--- /dev/null
+++ src/slonspool/slonspool
@@ -0,0 +1,171 @@
+#!perl
+
+use Pg;
+use Getopt::Long;
+my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, $maxsize, $maxage, $node);
+my @SETS;
+my $dbh;
+process_options();
+connect_to_node();
+subscribe_to_node();
+
+while (1) {
+ listen_to_node();
+}
+
+sub subscribe_to_node {
+ open(SUBSCRIBE, ">$spoolpath/subscription.log");
+ foreach $set (@SETS) {
+
+ # Create namespaces
+ print SUBSCRIBE "-- Subscribing node $node to set $set on $host - $port/$database/$user\n";
+ my $sth = $dbh->exec("select distinct nspname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
+ while ( @row = $sth->fetchrow ) {
+ my ($namespace) = @row;
+ print SUBSCRIBE "create schema $namespace;\n";
+ }
+ close(SUBSCRIBE);
+
+ # Create tables
+ $sth = $dbh->exec("select nspname || '.' || relname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
+ while ( @row = $sth->fetchrow ) {
+ my ($table) = @row;
+ `$pgbins/pg_dump -p $port -h $host -U $user -t $table -s $database >> $spoolpath/subscription.log`;
+ }
+ open(SUBSCRIBE, ">>$spoolpath/subscription.log");
+
+ # Pull data, as in copy_set (remote_worker.c)
+ my $query = "start transaction; set transaction isolation level serializable; ";
+ $sth = $dbh->exec($query);
+ my $tquery = qq{
+ select T.tab_id,
+ "pg_catalog".quote_ident(PGN.nspname) || '.' ||
+ "pg_catalog".quote_ident(PGC.relname) as tab_fqname,
+ T.tab_idxname, T.tab_comment
+ from "$cluster".sl_table T,
+ "pg_catalog".pg_class PGC,
+ "pg_catalog".pg_namespace PGN
+ where T.tab_set = $set
+ and T.tab_reloid = PGC.oid
+ and PGC.relnamespace = PGN.oid
+ order by tab_id;
+ };
+
+
+ }
+ $sth=$dbh->exec($tquery);
+ while (@row=$sth->fetchrow) {
+ my ($table) = @row;
+ print SUBSCRIBE qq{copy "$table" from stdin;\n};
+ my $query = qq{copy "$table" to stdout;};
+ $res = $dbh->exec($query);
+ my $line = "*" x 16384;
+ $ret = $dbh->getline($line, 16384);
+ while ($line ne "\\.") {
+ print SUBSCRIBE, line, "\n";
+ $ret = $dbh->getline($line, 16384);
+ }
+ print SUBSCRIBE "\.\n";
+ }
+ close SUBSCRIBE;
+ my $seqquery = qq{
+ select n.nspname, c.relname
+ from "pg_catalog".pg_class c, "pg_catalog".pg_namespace, "$cluster".sl_sequence s
+ where
+ n.oid = c.relnamespace and
+ c.oid = s.seq_reloid and
+ s.seq_set = $set;};
+ $sth=$dbh->exec($seqquery);
+ while (my @row=$sth->fetchrow) {
+ my ($nsp, $seqname) = @row;
+ `$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`;
+ }
+
+ # Next, populate Sync information
+ # Use the last SYNC's snapshot information and set
+ # the action sequence list to all actions after
+ # that.
+
+ my $squery = qq{
+ select ssy_seqno, ssy_minxid, ssy_maxxid,
+ ssy_xip, ssy_action_list
+ from "$cluster".sl_setsync
+ where ssy_setid = $set; };
+ $sth=$dbh->exec($squery);
+ while (my @row=$sth->fetchrow) {
+ my ($seqno, $minxid, $maxxid, $xip, $actionlist) = @row;
+ }
+ my $createsync = qq{
+ insert into "_$cluster".sl_setsync
+ (ssy_setid, ssy_origin, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, ssy_action_list)
+ values ($set, $node, $seqno, $minxid, $maxxid, '$xip', '$actionlist');};
+ print SUBSCRIBE $createsync, "\n";
+}
+
+sub listen_to_node {
+
+}
+
+sub connect_to_node {
+ my $dsn = "dbname=$database host=$host port=$port user=$user";
+ if ($password) {
+ $dsn .= " password=$password";
+ }
+ $dbh = Pg::connectdb($dsn);
+}
+
+sub process_options {
+
+ $goodopts = GetOptions("help", "database=s", "host=s", "user=s",
+ "cluster=s", "password=s", "port=s", "sets=s",
+ "spoolpath=s", "spoolname=s", "pgbins=s",
+ "maxsize=i", "maxage=i", "node=i");
+
+ if (defined ($opt_help)) {
+ show_usage();
+ }
+
+ $cluster=$opt_cluster if (defined($opt_cluster));
+ $node = $opt_node if (defined($opt_node));
+ $database=$opt_database if (defined ($opt_database));
+ $user = $opt_user if (defined ($opt_user));
+ $host = $opt_host if (defined($opt_host));
+ $password = $opt_password if (defined($opt_password));
+ $port = $opt_port if (defined($opt_port));
+ $pgbins = $opt_pgbins if (defined($opt_pgbins));
+ $spoolpath = $opt_spoolpath if (defined($opt_spoolpath));
+ $spoolname = $opt_spoolname if (defined($opt_spoolname));
+ if (defined($opt_sets)) {
+ @SETS=split (/,/, $opt_sets);
+ }
+ if (defined($opt_maxsize)){
+ $maxsize = $opt_maxsize;
+ } else {
+ $maxsize = 10000;
+ }
+ if (defined($opt_maxage)){
+ $maxsize = $opt_maxage;
+ } else {
+ $maxage = 300;
+ }
+}
+
+sub show_usage {
+ print qq{slonspool:
+ --help get help
+ --cluster=s Slony-I cluster name
+ --node=i Node number to use to request
+ --pgbins=s Location of PostgreSQL binaries including slonik and pg_dump
+ --database=s database to connect to
+ --host=s host for database
+ --user=s user for database
+ --password=s password for database (you should probably use .pgpass instead)
+ --port=i port number to connect to
+ --sets=s Sets to replicate (comma-delimited) - e.g --sets=1,2,4
+ --spoolpath=s directory in which to spool output
+ --spoolname=s naming convention for spoolfiles
+ --maxsize=i maximum size of spool files, in kB - default =10000KB
+ --maxage=i maximum age of spool files in seconds - default 300
+};
+ die -1;
+}
- Previous message: [Slony1-commit] By cbbrowne: Add in "draft" version of RebuildListenEntries() that
- Next message: [Slony1-commit] By cbbrowne: Add a list of features to be worked on
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list