Wed Mar 16 17:15:43 PST 2005
- Previous message: [Slony1-commit] By cbbrowne: Add Debian packaging control files, contributed by Tim
- Next message: [Slony1-commit] By smsimms: Vary the sleep_seconds by up to 50% in either direction.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- Change the SUBSCRIBE_SET event so that it starts by checking for the availability of tables on the subscriber node, and subscribes sequences _BEFORE_ copying all the data. That way we find problems before doing 8h of copying of data... As suggested by Hannu Krosing Modified Files: -------------- slony1-engine/src/slon: remote_worker.c (r1.77 -> r1.78) -------------- next part -------------- Index: remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.77 retrieving revision 1.78 diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.77 -r1.78 --- src/slon/remote_worker.c +++ src/slon/remote_worker.c @@ -2352,6 +2352,180 @@ } } + /* cbbrowne - in progress - check tables/sequences in set to + * make sure they are there and in good order. Don't copy any + * data yet; we want to just do a first pass that finds "bozo + * errors" */ + + /* Check tables and sequences in set to make sure they are all + * appropriately configured... */ + + /* + * Select the list of all tables the provider currently has in the set. + */ + slon_mkquery(&query1, + "select T.tab_id, " + " \"pg_catalog\".quote_ident(PGN.nspname) || '.' || " + " \"pg_catalog\".quote_ident(PGC.relname) as tab_fqname, " + " T.tab_idxname, T.tab_comment " + "from %s.sl_table T, " + " \"pg_catalog\".pg_class PGC, " + " \"pg_catalog\".pg_namespace PGN " + "where T.tab_set = %d " + " and T.tab_reloid = PGC.oid " + " and PGC.relnamespace = PGN.oid " + "order by tab_id; ", + rtcfg_namespace, set_id); + res1 = PQexec(pro_dbconn, dstring_data(&query1)); + if (PQresultStatus(res1) != PGRES_TUPLES_OK) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", + node->no_id, dstring_data(&query1), + PQresultErrorMessage(res1)); + PQclear(res1); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + ntuples1 = PQntuples(res1); + + /* + * For each table in the set + */ + for (tupno1 = 0; tupno1 < ntuples1; tupno1++) + { + int tab_id = strtol(PQgetvalue(res1, tupno1, 0), NULL, 10); + char *tab_fqname = PQgetvalue(res1, tupno1, 1); + char *tab_idxname = PQgetvalue(res1, tupno1, 2); + char *tab_comment = PQgetvalue(res1, tupno1, 3); + int64 copysize = 0; + + gettimeofday(&tv_start2, NULL); + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " + "prepare to copy table %s\n", + node->no_id, tab_fqname); + + /* + * Find out if the table we're copying has the special slony serial + * number key on the provider DB + */ + slon_mkquery(&query1, + "select %s.tableHasSerialKey('%q');", + rtcfg_namespace, tab_fqname); + res2 = PQexec(pro_dbconn, dstring_data(&query1)); + if (PQresultStatus(res2) != PGRES_TUPLES_OK) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", + node->no_id, dstring_data(&query1), + PQresultErrorMessage(res2)); + PQclear(res2); + PQclear(res1); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + rc = *PQgetvalue(res2, 0, 0) == 't'; + PQclear(res2); + + if (rc) + { + /* + * It has, check if the table has this on the local DB too. + */ + slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: " + "table %s will require Slony-I serial key\n", + node->no_id, tab_fqname); + res2 = PQexec(loc_dbconn, dstring_data(&query1)); + if (PQresultStatus(res2) != PGRES_TUPLES_OK) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", + node->no_id, dstring_data(&query1), + PQresultErrorMessage(res2)); + PQclear(res2); + PQclear(res1); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + rc = *PQgetvalue(res2, 0, 0) == 't'; + PQclear(res2); + + if (!rc) + { + slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: " + "table %s Slony-I serial key to be added local\n", + node->no_id, tab_fqname); + } + } + else + { + slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: " + "table %s does not require Slony-I serial key\n", + node->no_id, tab_fqname); + } + } + PQclear(res1); + + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " + "all tables for set %d found on subscriber\n", + node->no_id, set_id); + /* + * Add in the sequences contained in the set + */ + slon_mkquery(&query1, + "select SQ.seq_id, " + " \"pg_catalog\".quote_ident(PGN.nspname) || '.' || " + " \"pg_catalog\".quote_ident(PGC.relname), " + " SQ.seq_comment " + " from %s.sl_sequence SQ, " + " \"pg_catalog\".pg_class PGC, " + " \"pg_catalog\".pg_namespace PGN " + " where SQ.seq_set = %d " + " and PGC.oid = SQ.seq_reloid " + " and PGN.oid = PGC.relnamespace; ", + rtcfg_namespace, set_id); + res1 = PQexec(pro_dbconn, dstring_data(&query1)); + if (PQresultStatus(res1) != PGRES_TUPLES_OK) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", + node->no_id, dstring_data(&query1), + PQresultErrorMessage(res1)); + PQclear(res1); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + ntuples1 = PQntuples(res1); + for (tupno1 = 0; tupno1 < ntuples1; tupno1++) + { + char *seq_id = PQgetvalue(res1, tupno1, 0); + char *seq_fqname = PQgetvalue(res1, tupno1, 1); + char *seq_comment = PQgetvalue(res1, tupno1, 2); + + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " + "copy sequence %s\n", + node->no_id, seq_fqname); + + slon_mkquery(&query1, + "select %s.setAddSequence_int(%d, %s, '%q', '%q')", + rtcfg_namespace, set_id, seq_id, + seq_fqname, seq_comment); + if (query_execute(node, loc_dbconn, &query1) < 0) + { + PQclear(res1); + slon_disconnectdb(pro_conn); + dstring_free(&query1); + terminate_log_archive(); + return -1; + } + } + PQclear(res1); + + /* * Select the list of all tables the provider currently has in the set. */ @@ -2842,55 +3016,60 @@ /* * Copy the sequences contained in the set */ - slon_mkquery(&query1, - "select SQ.seq_id, " - " \"pg_catalog\".quote_ident(PGN.nspname) || '.' || " - " \"pg_catalog\".quote_ident(PGC.relname), " - " SQ.seq_comment " - " from %s.sl_sequence SQ, " - " \"pg_catalog\".pg_class PGC, " - " \"pg_catalog\".pg_namespace PGN " - " where SQ.seq_set = %d " - " and PGC.oid = SQ.seq_reloid " - " and PGN.oid = PGC.relnamespace; ", - rtcfg_namespace, set_id); - res1 = PQexec(pro_dbconn, dstring_data(&query1)); - if (PQresultStatus(res1) != PGRES_TUPLES_OK) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", - node->no_id, dstring_data(&query1), - PQresultErrorMessage(res1)); - PQclear(res1); - slon_disconnectdb(pro_conn); - dstring_free(&query1); - terminate_log_archive(); - return -1; - } - ntuples1 = PQntuples(res1); - for (tupno1 = 0; tupno1 < ntuples1; tupno1++) - { - char *seq_id = PQgetvalue(res1, tupno1, 0); - char *seq_fqname = PQgetvalue(res1, tupno1, 1); - char *seq_comment = PQgetvalue(res1, tupno1, 2); - - slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " - "copy sequence %s\n", - node->no_id, seq_fqname); - slon_mkquery(&query1, - "select %s.setAddSequence_int(%d, %s, '%q', '%q')", - rtcfg_namespace, set_id, seq_id, - seq_fqname, seq_comment); - if (query_execute(node, loc_dbconn, &query1) < 0) - { - PQclear(res1); - slon_disconnectdb(pro_conn); - dstring_free(&query1); - terminate_log_archive(); - return -1; - } - } - PQclear(res1); + /* The copy of sequences is being done earlier, before we + * start doing tables, so that if anything is missing, that is + * noticed BEFORE 8 hours of copying of data takes place... */ + +/* slon_mkquery(&query1, */ +/* "select SQ.seq_id, " */ +/* " \"pg_catalog\".quote_ident(PGN.nspname) || '.' || " */ +/* " \"pg_catalog\".quote_ident(PGC.relname), " */ +/* " SQ.seq_comment " */ +/* " from %s.sl_sequence SQ, " */ +/* " \"pg_catalog\".pg_class PGC, " */ +/* " \"pg_catalog\".pg_namespace PGN " */ +/* " where SQ.seq_set = %d " */ +/* " and PGC.oid = SQ.seq_reloid " */ +/* " and PGN.oid = PGC.relnamespace; ", */ +/* rtcfg_namespace, set_id); */ +/* res1 = PQexec(pro_dbconn, dstring_data(&query1)); */ +/* if (PQresultStatus(res1) != PGRES_TUPLES_OK) */ +/* { */ +/* slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", */ +/* node->no_id, dstring_data(&query1), */ +/* PQresultErrorMessage(res1)); */ +/* PQclear(res1); */ +/* slon_disconnectdb(pro_conn); */ +/* dstring_free(&query1); */ +/* terminate_log_archive(); */ +/* return -1; */ +/* } */ +/* ntuples1 = PQntuples(res1); */ +/* for (tupno1 = 0; tupno1 < ntuples1; tupno1++) */ +/* { */ +/* char *seq_id = PQgetvalue(res1, tupno1, 0); */ +/* char *seq_fqname = PQgetvalue(res1, tupno1, 1); */ +/* char *seq_comment = PQgetvalue(res1, tupno1, 2); */ + +/* slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " */ +/* "copy sequence %s\n", */ +/* node->no_id, seq_fqname); */ + +/* slon_mkquery(&query1, */ +/* "select %s.setAddSequence_int(%d, %s, '%q', '%q')", */ +/* rtcfg_namespace, set_id, seq_id, */ +/* seq_fqname, seq_comment); */ +/* if (query_execute(node, loc_dbconn, &query1) < 0) */ +/* { */ +/* PQclear(res1); */ +/* slon_disconnectdb(pro_conn); */ +/* dstring_free(&query1); */ +/* terminate_log_archive(); */ +/* return -1; */ +/* } */ +/* } */ +/* PQclear(res1); */ /* * And copy over the sequence last_value corresponding to the
- Previous message: [Slony1-commit] By cbbrowne: Add Debian packaging control files, contributed by Tim
- Next message: [Slony1-commit] By smsimms: Vary the sleep_seconds by up to 50% in either direction.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list