Wed Mar 16 17:15:43 PST 2005
- Previous message: [Slony1-commit] By cbbrowne: Add Debian packaging control files, contributed by Tim
- Next message: [Slony1-commit] By smsimms: Vary the sleep_seconds by up to 50% in either direction.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Change the SUBSCRIBE_SET event so that it starts by checking for
the availability of tables on the subscriber node, and subscribes
sequences _BEFORE_ copying all the data.
That way we find problems before doing 8h of copying of data...
As suggested by Hannu Krosing
Modified Files:
--------------
slony1-engine/src/slon:
remote_worker.c (r1.77 -> r1.78)
-------------- next part --------------
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.77
retrieving revision 1.78
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.77 -r1.78
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -2352,6 +2352,180 @@
}
}
+ /* cbbrowne - in progress - check tables/sequences in set to
+ * make sure they are there and in good order. Don't copy any
+ * data yet; we want to just do a first pass that finds "bozo
+ * errors" */
+
+ /* Check tables and sequences in set to make sure they are all
+ * appropriately configured... */
+
+ /*
+ * Select the list of all tables the provider currently has in the set.
+ */
+ slon_mkquery(&query1,
+ "select T.tab_id, "
+ " \"pg_catalog\".quote_ident(PGN.nspname) || '.' || "
+ " \"pg_catalog\".quote_ident(PGC.relname) as tab_fqname, "
+ " T.tab_idxname, T.tab_comment "
+ "from %s.sl_table T, "
+ " \"pg_catalog\".pg_class PGC, "
+ " \"pg_catalog\".pg_namespace PGN "
+ "where T.tab_set = %d "
+ " and T.tab_reloid = PGC.oid "
+ " and PGC.relnamespace = PGN.oid "
+ "order by tab_id; ",
+ rtcfg_namespace, set_id);
+ res1 = PQexec(pro_dbconn, dstring_data(&query1));
+ if (PQresultStatus(res1) != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
+ node->no_id, dstring_data(&query1),
+ PQresultErrorMessage(res1));
+ PQclear(res1);
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ ntuples1 = PQntuples(res1);
+
+ /*
+ * For each table in the set
+ */
+ for (tupno1 = 0; tupno1 < ntuples1; tupno1++)
+ {
+ int tab_id = strtol(PQgetvalue(res1, tupno1, 0), NULL, 10);
+ char *tab_fqname = PQgetvalue(res1, tupno1, 1);
+ char *tab_idxname = PQgetvalue(res1, tupno1, 2);
+ char *tab_comment = PQgetvalue(res1, tupno1, 3);
+ int64 copysize = 0;
+
+ gettimeofday(&tv_start2, NULL);
+ slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
+ "prepare to copy table %s\n",
+ node->no_id, tab_fqname);
+
+ /*
+ * Find out if the table we're copying has the special slony serial
+ * number key on the provider DB
+ */
+ slon_mkquery(&query1,
+ "select %s.tableHasSerialKey('%q');",
+ rtcfg_namespace, tab_fqname);
+ res2 = PQexec(pro_dbconn, dstring_data(&query1));
+ if (PQresultStatus(res2) != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
+ node->no_id, dstring_data(&query1),
+ PQresultErrorMessage(res2));
+ PQclear(res2);
+ PQclear(res1);
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ rc = *PQgetvalue(res2, 0, 0) == 't';
+ PQclear(res2);
+
+ if (rc)
+ {
+ /*
+ * It has, check if the table has this on the local DB too.
+ */
+ slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
+ "table %s will require Slony-I serial key\n",
+ node->no_id, tab_fqname);
+ res2 = PQexec(loc_dbconn, dstring_data(&query1));
+ if (PQresultStatus(res2) != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
+ node->no_id, dstring_data(&query1),
+ PQresultErrorMessage(res2));
+ PQclear(res2);
+ PQclear(res1);
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ rc = *PQgetvalue(res2, 0, 0) == 't';
+ PQclear(res2);
+
+ if (!rc)
+ {
+ slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
+ "table %s Slony-I serial key to be added local\n",
+ node->no_id, tab_fqname);
+ }
+ }
+ else
+ {
+ slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
+ "table %s does not require Slony-I serial key\n",
+ node->no_id, tab_fqname);
+ }
+ }
+ PQclear(res1);
+
+ slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
+ "all tables for set %d found on subscriber\n",
+ node->no_id, set_id);
+ /*
+ * Add in the sequences contained in the set
+ */
+ slon_mkquery(&query1,
+ "select SQ.seq_id, "
+ " \"pg_catalog\".quote_ident(PGN.nspname) || '.' || "
+ " \"pg_catalog\".quote_ident(PGC.relname), "
+ " SQ.seq_comment "
+ " from %s.sl_sequence SQ, "
+ " \"pg_catalog\".pg_class PGC, "
+ " \"pg_catalog\".pg_namespace PGN "
+ " where SQ.seq_set = %d "
+ " and PGC.oid = SQ.seq_reloid "
+ " and PGN.oid = PGC.relnamespace; ",
+ rtcfg_namespace, set_id);
+ res1 = PQexec(pro_dbconn, dstring_data(&query1));
+ if (PQresultStatus(res1) != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
+ node->no_id, dstring_data(&query1),
+ PQresultErrorMessage(res1));
+ PQclear(res1);
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ ntuples1 = PQntuples(res1);
+ for (tupno1 = 0; tupno1 < ntuples1; tupno1++)
+ {
+ char *seq_id = PQgetvalue(res1, tupno1, 0);
+ char *seq_fqname = PQgetvalue(res1, tupno1, 1);
+ char *seq_comment = PQgetvalue(res1, tupno1, 2);
+
+ slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
+ "copy sequence %s\n",
+ node->no_id, seq_fqname);
+
+ slon_mkquery(&query1,
+ "select %s.setAddSequence_int(%d, %s, '%q', '%q')",
+ rtcfg_namespace, set_id, seq_id,
+ seq_fqname, seq_comment);
+ if (query_execute(node, loc_dbconn, &query1) < 0)
+ {
+ PQclear(res1);
+ slon_disconnectdb(pro_conn);
+ dstring_free(&query1);
+ terminate_log_archive();
+ return -1;
+ }
+ }
+ PQclear(res1);
+
+
/*
* Select the list of all tables the provider currently has in the set.
*/
@@ -2842,55 +3016,60 @@
/*
* Copy the sequences contained in the set
*/
- slon_mkquery(&query1,
- "select SQ.seq_id, "
- " \"pg_catalog\".quote_ident(PGN.nspname) || '.' || "
- " \"pg_catalog\".quote_ident(PGC.relname), "
- " SQ.seq_comment "
- " from %s.sl_sequence SQ, "
- " \"pg_catalog\".pg_class PGC, "
- " \"pg_catalog\".pg_namespace PGN "
- " where SQ.seq_set = %d "
- " and PGC.oid = SQ.seq_reloid "
- " and PGN.oid = PGC.relnamespace; ",
- rtcfg_namespace, set_id);
- res1 = PQexec(pro_dbconn, dstring_data(&query1));
- if (PQresultStatus(res1) != PGRES_TUPLES_OK)
- {
- slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
- node->no_id, dstring_data(&query1),
- PQresultErrorMessage(res1));
- PQclear(res1);
- slon_disconnectdb(pro_conn);
- dstring_free(&query1);
- terminate_log_archive();
- return -1;
- }
- ntuples1 = PQntuples(res1);
- for (tupno1 = 0; tupno1 < ntuples1; tupno1++)
- {
- char *seq_id = PQgetvalue(res1, tupno1, 0);
- char *seq_fqname = PQgetvalue(res1, tupno1, 1);
- char *seq_comment = PQgetvalue(res1, tupno1, 2);
-
- slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
- "copy sequence %s\n",
- node->no_id, seq_fqname);
- slon_mkquery(&query1,
- "select %s.setAddSequence_int(%d, %s, '%q', '%q')",
- rtcfg_namespace, set_id, seq_id,
- seq_fqname, seq_comment);
- if (query_execute(node, loc_dbconn, &query1) < 0)
- {
- PQclear(res1);
- slon_disconnectdb(pro_conn);
- dstring_free(&query1);
- terminate_log_archive();
- return -1;
- }
- }
- PQclear(res1);
+ /* The copy of sequences is being done earlier, before we
+ * start doing tables, so that if anything is missing, that is
+ * noticed BEFORE 8 hours of copying of data takes place... */
+
+/* slon_mkquery(&query1, */
+/* "select SQ.seq_id, " */
+/* " \"pg_catalog\".quote_ident(PGN.nspname) || '.' || " */
+/* " \"pg_catalog\".quote_ident(PGC.relname), " */
+/* " SQ.seq_comment " */
+/* " from %s.sl_sequence SQ, " */
+/* " \"pg_catalog\".pg_class PGC, " */
+/* " \"pg_catalog\".pg_namespace PGN " */
+/* " where SQ.seq_set = %d " */
+/* " and PGC.oid = SQ.seq_reloid " */
+/* " and PGN.oid = PGC.relnamespace; ", */
+/* rtcfg_namespace, set_id); */
+/* res1 = PQexec(pro_dbconn, dstring_data(&query1)); */
+/* if (PQresultStatus(res1) != PGRES_TUPLES_OK) */
+/* { */
+/* slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", */
+/* node->no_id, dstring_data(&query1), */
+/* PQresultErrorMessage(res1)); */
+/* PQclear(res1); */
+/* slon_disconnectdb(pro_conn); */
+/* dstring_free(&query1); */
+/* terminate_log_archive(); */
+/* return -1; */
+/* } */
+/* ntuples1 = PQntuples(res1); */
+/* for (tupno1 = 0; tupno1 < ntuples1; tupno1++) */
+/* { */
+/* char *seq_id = PQgetvalue(res1, tupno1, 0); */
+/* char *seq_fqname = PQgetvalue(res1, tupno1, 1); */
+/* char *seq_comment = PQgetvalue(res1, tupno1, 2); */
+
+/* slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " */
+/* "copy sequence %s\n", */
+/* node->no_id, seq_fqname); */
+
+/* slon_mkquery(&query1, */
+/* "select %s.setAddSequence_int(%d, %s, '%q', '%q')", */
+/* rtcfg_namespace, set_id, seq_id, */
+/* seq_fqname, seq_comment); */
+/* if (query_execute(node, loc_dbconn, &query1) < 0) */
+/* { */
+/* PQclear(res1); */
+/* slon_disconnectdb(pro_conn); */
+/* dstring_free(&query1); */
+/* terminate_log_archive(); */
+/* return -1; */
+/* } */
+/* } */
+/* PQclear(res1); */
/*
* And copy over the sequence last_value corresponding to the
- Previous message: [Slony1-commit] By cbbrowne: Add Debian packaging control files, contributed by Tim
- Next message: [Slony1-commit] By smsimms: Vary the sleep_seconds by up to 50% in either direction.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list