Thu Oct 27 20:20:46 PDT 2005
- Previous message: [Slony1-commit] By cbbrowne: The query looking at copyFields() return codes was looking
- Next message: [Slony1-commit] By cbbrowne: At start of COPY_SET, we run through all the tables to "see
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Forward patching all the fixes for 1.1.2 into HEAD.
Jan
Modified Files:
--------------
slony1-engine/src/backend:
slony1_funcs.sql (r1.69 -> r1.70)
slony1-engine/src/ducttape:
compare_pgbench_dumps (r1.3 -> r1.4)
slony1-engine/src/slon:
remote_worker.c (r1.92 -> r1.93)
slony1-engine/src/slonik:
slonik.c (r1.48 -> r1.49)
-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.69
retrieving revision 1.70
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.69 -r1.70
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -940,44 +940,6 @@
perform @NAMESPACE at .terminateNodeConnections(
''_ at CLUSTERNAME@_Node_'' || p_failed_node);
--- Note that the following code should all become obsolete in the wake
--- of the availability of RebuildListenEntries()...
-
-if false then
- -- ----
- -- Let every node that listens for something on the failed node
- -- listen for that on the backup node instead.
- -- ----
- for v_row in select * from @NAMESPACE at .sl_listen
- where li_provider = p_failed_node
- and li_receiver <> p_backup_node
- loop
- perform @NAMESPACE at .storeListen_int(v_row.li_origin,
- p_backup_node, v_row.li_receiver);
- end loop;
-
- -- ----
- -- Let the backup node listen for all events where the
- -- failed node did listen for it.
- -- ----
- for v_row in select li_origin, li_provider
- from @NAMESPACE at .sl_listen
- where li_receiver = p_failed_node
- and li_provider <> p_backup_node
- loop
- perform @NAMESPACE at .storeListen_int(v_row.li_origin,
- v_row.li_provider, p_backup_node);
- end loop;
-
- -- ----
- -- Remove all sl_listen entries that receive anything from the
- -- failed node.
- -- ----
- delete from @NAMESPACE at .sl_listen
- where li_provider = p_failed_node
- or li_receiver = p_failed_node;
-end if;
-
-- ----
-- Move the sets
-- ----
@@ -1009,12 +971,10 @@
loop
perform @NAMESPACE at .alterTableRestore(v_row2.tab_id);
end loop;
- end if;
update @NAMESPACE at .sl_set set set_origin = p_backup_node
where set_id = v_row.set_id;
- if p_backup_node = @NAMESPACE at .getLocalNodeId(''_ at CLUSTERNAME@'') then
delete from @NAMESPACE at .sl_setsync
where ssy_setid = v_row.set_id;
@@ -1135,7 +1095,7 @@
'FUNCTION failedNode2 (failed_node, backup_node, set_id, ev_seqno, ev_seqfake)
On the node that has the highest sequence number of the failed node,
-fake the FAILED_NODE event.';
+fake the FAILOVER_SET event.';
-- ----------------------------------------------------------------------
-- FUNCTION failoverSet_int (failed_node, backup_node, set_id)
@@ -1183,6 +1143,15 @@
loop
perform @NAMESPACE at .alterTableForReplication(v_row.tab_id);
end loop;
+ insert into @NAMESPACE at .sl_event
+ (ev_origin, ev_seqno, ev_timestamp,
+ ev_minxid, ev_maxxid, ev_xip,
+ ev_type, ev_data1, ev_data2, ev_data3)
+ values
+ (p_backup_node, "pg_catalog".nextval(''@NAMESPACE at .sl_event_seq''), CURRENT_TIMESTAMP,
+ ''0'', ''0'', '''',
+ ''ACCEPT_SET'', p_set_id::text,
+ p_failed_node::text, p_backup_node::text);
else
delete from @NAMESPACE at .sl_subscribe
where sub_set = p_set_id
@@ -3483,10 +3452,9 @@
end if;
-- ----
- -- Restore all original triggers and rules
+ -- Restore all original triggers and rules of all sets
-- ----
for v_row in select * from @NAMESPACE at .sl_table
- where tab_set = p_set_id
loop
perform @NAMESPACE at .alterTableRestore(v_row.tab_id);
end loop;
@@ -3500,7 +3468,6 @@
-- Put all tables back into replicated mode
-- ----
for v_row in select * from @NAMESPACE at .sl_table
- where tab_set = p_set_id
loop
perform @NAMESPACE at .alterTableForReplication(v_row.tab_id);
end loop;
@@ -5104,7 +5071,7 @@
p_old alias for $1;
begin
-- upgrade sl_table
- if p_old = ''1.0.2'' or p_old = ''1.0.5'' then
+ if p_old IN (''1.0.2'', ''1.0.5'', ''1.0.6'') then
-- Add new column(s) sl_table.tab_relname, sl_table.tab_nspname
execute ''alter table @NAMESPACE at .sl_table add column tab_relname name'';
execute ''alter table @NAMESPACE at .sl_table add column tab_nspname name'';
@@ -5123,7 +5090,7 @@
end if;
-- upgrade sl_sequence
- if p_old = ''1.0.2'' or p_old = ''1.0.5'' then
+ if p_old IN (''1.0.2'', ''1.0.5'', ''1.0.6'') then
-- Add new column(s) sl_sequence.seq_relname, sl_sequence.seq_nspname
execute ''alter table @NAMESPACE at .sl_sequence add column seq_relname name'';
execute ''alter table @NAMESPACE at .sl_sequence add column seq_nspname name'';
@@ -5143,7 +5110,7 @@
-- ----
-- Changes from 1.0.x to 1.1.0
-- ----
- if p_old = ''1.0.2'' or p_old = ''1.0.5'' then
+ if p_old IN (''1.0.2'', ''1.0.5'', ''1.0.6'') then
-- Add new column sl_node.no_spool for virtual spool nodes
execute ''alter table @NAMESPACE at .sl_node add column no_spool boolean'';
update @NAMESPACE at .sl_node set no_spool = false;
Index: compare_pgbench_dumps
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/compare_pgbench_dumps,v
retrieving revision 1.3
retrieving revision 1.4
diff -Lsrc/ducttape/compare_pgbench_dumps -Lsrc/ducttape/compare_pgbench_dumps -u -w -r1.3 -r1.4
--- src/ducttape/compare_pgbench_dumps
+++ src/ducttape/compare_pgbench_dumps
@@ -8,7 +8,7 @@
DB2=$2 # Second database
echo -n "**** comparing databases $DB1 and $DB2 ... "
-psql $DB1 -o dump.tmp.1.$$ <<_EOF_
+psql -o dump.tmp.1.$$ $DB1 <<_EOF_
select 'accounts:'::text, aid, bid, abalance, filler
from accounts order by aid;
select 'branches:'::text, bid, bbalance, filler
@@ -18,7 +18,7 @@
select 'history:'::text, tid, bid, aid, delta, mtime, filler,
"_Slony-I_T1_rowID" from history order by "_Slony-I_T1_rowID";
_EOF_
-psql $DB2 -o dump.tmp.2.$$ <<_EOF_
+psql -o dump.tmp.2.$$ $DB2 <<_EOF_
select 'accounts:'::text, aid, bid, abalance, filler
from accounts order by aid;
select 'branches:'::text, bid, bbalance, filler
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.92
retrieving revision 1.93
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.92 -r1.93
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -1061,7 +1061,7 @@
slon_log(SLON_DEBUG2, "got parms ACCEPT_SET\n");
/* If we're a remote node, and haven't yet
- * received the MOVE_SET event from the
+ * received the MOVE/FAILOVER_SET event from the
* new origin, then we'll need to sleep a
* bit... This avoids a race condition
* where new SYNCs take place on the new
@@ -1071,49 +1071,50 @@
* received and processed */
if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin)) {
- slon_log(SLON_DEBUG2, "ACCEPT_SET - node not origin - wait...\n");
- slon_mkquery(&query1,
- "select 1 from %s.sl_event accept "
- "where "
- " accept.ev_type = 'ACCEPT_SET' and "
- " accept.ev_origin = %d and "
- " accept.ev_data1 = %d and "
- " accept.ev_data2 = %d and "
- " accept.ev_data3 = %d and "
- " not exists "
- " (select 1 from %s.sl_event move "
+ slon_log(SLON_DEBUG2, "ACCEPT_SET - node not origin\n");
+ slon_mkquery(&query2,
+ "select 1 from %s.sl_event "
" where "
- " accept.ev_origin = move.ev_data3 and "
- " move.ev_type = 'MOVE_SET' and "
- " move.ev_data1 = accept.ev_data1 and "
- " move.ev_data2 = accept.ev_data2 and "
- " move.ev_data3 = accept.ev_data3); ",
+ " (ev_origin = %d and "
+ " ev_type = 'MOVE_SET' and "
+ " ev_data1 = '%d' and "
+ " ev_data2 = '%d' and "
+ " ev_data3 = '%d') "
+ "or "
+ " (ev_origin = %d and "
+ " ev_type = 'FAILOVER_SET' and "
+ " ev_data1 = '%d' and "
+ " ev_data2 = '%d' and "
+ " ev_data3 = '%d'); ",
rtcfg_namespace,
old_origin, set_id, old_origin, new_origin,
- rtcfg_namespace);
- res = PQexec(local_dbconn, dstring_data(&query1));
- while (PQntuples(res) > 0) {
- int sleeptime = 15;
- int sched_rc;
- slon_log(SLON_WARN, "remoteWorkerThread_%d: "
- "accept set: node has not yet received MOVE_SET event "
- "for set %d old origin %d new origin - sleep %d seconds\n",
- rtcfg_nodeid, set_id, old_origin, new_origin, sleeptime);
- sched_rc = sched_msleep(node, sleeptime * 1000);
- if (sched_rc != SCHED_STATUS_OK) {
- event_ok = false;
- break;
- } else {
- if (sleeptime < 60)
- sleeptime *= 2;
- }
- res = PQexec(local_dbconn, dstring_data(&query1));
+ old_origin, old_origin, new_origin, set_id);
+
+ res = PQexec(local_dbconn, dstring_data(&query2));
+ while (PQntuples(res) == 0)
+ {
+ slon_log(SLON_DEBUG2, "ACCEPT_SET - MOVE_SET or FAILOVER_SET not received yet - sleep\n");
+ if (sched_msleep(node, 10000) != SCHED_STATUS_OK)
+ slon_abort();
+ PQclear(res);
+ res = PQexec(local_dbconn, dstring_data(&query2));
}
+ PQclear(res);
+ slon_log(SLON_DEBUG2, "ACCEPT_SET - MOVE_SET or FAILOVER_SET exists - done\n");
+
+ slon_appendquery(&query1,
+ "notify \"_%s_Restart\"; ",
+ rtcfg_cluster_name);
+ query_append_event(&query1, event);
+ slon_appendquery(&query1, "commit transaction;");
+ query_execute(node, local_dbconn, &query1);
+ slon_abort();
+
+ need_reloadListen = true;
} else {
slon_log(SLON_DEBUG2, "ACCEPT_SET - on origin node...\n");
}
- slon_log(SLON_DEBUG2, "ACCEPT_SET - done...\n");
}
else if (strcmp(event->ev_type, "MOVE_SET") == 0)
@@ -1292,13 +1293,12 @@
* Data copy for new enabled set has failed. Rollback
* the transaction, sleep and try again.
*/
- if (query_execute(node, local_dbconn, &query2) < 0)
- slon_abort();
-
slon_log(SLON_WARN, "remoteWorkerThread_%d: "
"data copy for set %d failed - "
"sleep %d seconds\n",
node->no_id, sub_set, sleeptime);
+ if (query_execute(node, local_dbconn, &query2) < 0)
+ slon_abort();
sched_rc = sched_msleep(node, sleeptime * 1000);
if (sched_rc != SCHED_STATUS_OK)
{
@@ -2461,6 +2461,7 @@
pro_dbconn = pro_conn->dbconn;
loc_dbconn = local_conn->dbconn;
dstring_init(&query1);
+ dstring_init(&query2);
dstring_init(&query3);
dstring_init(&indexregenquery);
sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
@@ -2478,6 +2479,9 @@
node->no_id, archive_tmp, strerror(errno));
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2488,6 +2492,9 @@
node->no_id, archive_tmp, strerror(errno));
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2503,6 +2510,9 @@
{
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2533,6 +2543,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2544,6 +2557,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2558,6 +2574,9 @@
{
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2597,6 +2616,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2631,6 +2653,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2655,6 +2680,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2706,6 +2734,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2729,6 +2760,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2763,6 +2797,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2801,6 +2838,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2825,6 +2865,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2846,6 +2889,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2882,6 +2928,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2903,6 +2952,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2918,6 +2970,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2932,7 +2987,6 @@
"Begin COPY of table %s\n",
node->no_id, tab_fqname);
- dstring_init(&query2);
slon_mkquery(&query2, "select %s.copyFields(%d);",
rtcfg_namespace, tab_id);
@@ -2946,6 +3000,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2964,6 +3021,8 @@
slon_disconnectdb(pro_conn);
dstring_free(&query1);
dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -2993,6 +3052,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3008,6 +3070,9 @@
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3036,6 +3101,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3064,6 +3132,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3083,6 +3154,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
@@ -3104,6 +3178,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3125,6 +3202,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3142,6 +3222,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3156,6 +3239,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3214,6 +3300,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3236,6 +3325,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3255,6 +3347,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3273,64 +3368,6 @@
gettimeofday(&tv_start2, NULL);
/*
- * Copy the sequences contained in the set
- */
-
- /* The copy of sequences is being done earlier, before we
- * start doing tables, so that if anything is missing, that is
- * noticed BEFORE 8 hours of copying of data takes place... */
-
-/* slon_mkquery(&query1, */
-/* "select SQ.seq_id, " */
-/* " \"pg_catalog\".quote_ident(PGN.nspname) || '.' || " */
-/* " \"pg_catalog\".quote_ident(PGC.relname), " */
-/* " SQ.seq_comment " */
-/* " from %s.sl_sequence SQ, " */
-/* " \"pg_catalog\".pg_class PGC, " */
-/* " \"pg_catalog\".pg_namespace PGN " */
-/* " where SQ.seq_set = %d " */
-/* " and PGC.oid = SQ.seq_reloid " */
-/* " and PGN.oid = PGC.relnamespace; ", */
-/* rtcfg_namespace, set_id); */
-/* res1 = PQexec(pro_dbconn, dstring_data(&query1)); */
-/* if (PQresultStatus(res1) != PGRES_TUPLES_OK) */
-/* { */
-/* slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", */
-/* node->no_id, dstring_data(&query1), */
-/* PQresultErrorMessage(res1)); */
-/* PQclear(res1); */
-/* slon_disconnectdb(pro_conn); */
-/* dstring_free(&query1); */
-/* terminate_log_archive(); */
-/* return -1; */
-/* } */
-/* ntuples1 = PQntuples(res1); */
-/* for (tupno1 = 0; tupno1 < ntuples1; tupno1++) */
-/* { */
-/* char *seq_id = PQgetvalue(res1, tupno1, 0); */
-/* char *seq_fqname = PQgetvalue(res1, tupno1, 1); */
-/* char *seq_comment = PQgetvalue(res1, tupno1, 2); */
-
-/* slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " */
-/* "copy sequence %s\n", */
-/* node->no_id, seq_fqname); */
-
-/* slon_mkquery(&query1, */
-/* "select %s.setAddSequence_int(%d, %s, '%q', '%q')", */
-/* rtcfg_namespace, set_id, seq_id, */
-/* seq_fqname, seq_comment); */
-/* if (query_execute(node, loc_dbconn, &query1) < 0) */
-/* { */
-/* PQclear(res1); */
-/* slon_disconnectdb(pro_conn); */
-/* dstring_free(&query1); */
-/* terminate_log_archive(); */
-/* return -1; */
-/* } */
-/* } */
-/* PQclear(res1); */
-
- /*
* And copy over the sequence last_value corresponding to the
* ENABLE_SUBSCRIPTION event.
*/
@@ -3359,6 +3396,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3402,6 +3442,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3446,6 +3489,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3457,6 +3503,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3504,6 +3553,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3515,6 +3567,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3542,7 +3597,6 @@
"from %s.sl_log_2 where log_origin = %d and %s; ",
rtcfg_namespace, node->no_id, dstring_data(&query2),
rtcfg_namespace, node->no_id, dstring_data(&query2));
- dstring_free(&query2);
}
/*
@@ -3559,6 +3613,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3600,6 +3657,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3611,6 +3671,9 @@
PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3640,6 +3703,9 @@
{
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3655,6 +3721,9 @@
node->no_id);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3673,6 +3742,9 @@
node->no_id, archive_tmp, strerror(errno));
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
@@ -3688,11 +3760,17 @@
{
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
terminate_log_archive();
return -1;
}
slon_disconnectdb(pro_conn);
dstring_free(&query1);
+ dstring_free(&query2);
+ dstring_free(&query3);
+ dstring_free(&indexregenquery);
slon_log(SLON_DEBUG1, "remoteWorkerThread_%d: "
"disconnected from provider DB\n",
@@ -3768,6 +3846,15 @@
*/
for (provider = wd->provider_head; provider; provider = provider->next)
{
+ if (provider->conn != NULL)
+ {
+ if (PQstatus(provider->conn->dbconn) != CONNECTION_OK)
+ {
+ slon_disconnectdb(provider->conn);
+ provider->conn = NULL;
+ }
+ }
+
if (provider->conn == NULL)
{
if (provider->pa_conninfo == NULL)
@@ -3878,6 +3965,7 @@
PGresult *res2;
int ntuples2;
int tupno2;
+ int ntables_total = 0;
int added_or_to_provider = 0;
provider_qual = &(provider->helper_qualification);
@@ -3972,8 +4060,16 @@
if (ntuples2 == 0)
{
PQclear(res2);
+
+ if (!added_or_to_provider)
+ {
+ slon_appendquery(provider_qual, " ( false ) ");
+ added_or_to_provider = 1;
+ }
+
continue;
}
+ ntables_total += ntuples2;
/*
* ... and build up a query qualification that is
Index: slonik.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.c,v
retrieving revision 1.48
retrieving revision 1.49
diff -Lsrc/slonik/slonik.c -Lsrc/slonik/slonik.c -u -w -r1.48 -r1.49
--- src/slonik/slonik.c
+++ src/slonik/slonik.c
@@ -2521,6 +2521,7 @@
typedef struct
{
int set_id;
+ int num_directsub;
int num_subscribers;
failnode_node **subscribers;
failnode_node *max_node;
@@ -2592,8 +2593,7 @@
" and S.set_origin = %d "
" and SUB.sub_provider = %d "
" and SUB.sub_active "
- " group by set_id "
- " having count(S.set_id) > 1",
+ " group by set_id ",
stmt->hdr.script->clustername,
stmt->hdr.script->clustername,
stmt->no_id, stmt->no_id);
@@ -2681,6 +2681,10 @@
for (i = 0; i < num_sets; i++)
{
setinfo[i].set_id = (int)strtol(PQgetvalue(res2, i, 0), NULL, 10);
+ setinfo[i].num_directsub = (int)strtol(PQgetvalue(res2, i, 1), NULL, 10);
+
+ if (setinfo[i].num_directsub <= 1)
+ continue;
slon_mkquery(&query,
"select sub_receiver "
@@ -2857,6 +2861,34 @@
setinfo[i].max_node = NULL;
setinfo[i].max_seqno = 0;
+ if (setinfo[i].num_directsub <= 1)
+ {
+ int64 ev_seqno;
+
+ slon_mkquery(&query,
+ "select max(ev_seqno) "
+ " from \"_%s\".sl_event "
+ " where ev_origin = %d "
+ " and ev_type = 'SYNC'; ",
+ stmt->hdr.script->clustername,
+ stmt->no_id);
+ res1 = db_exec_select((SlonikStmt *) stmt,
+ adminfo1, &query);
+ if (res1 == NULL)
+ {
+ free(configbuf);
+ dstring_free(&query);
+ return -1;
+ }
+ slon_scanint64(PQgetvalue(res1, 0, 0), &ev_seqno);
+
+ setinfo[i].max_seqno = ev_seqno;
+
+ PQclear(res1);
+
+ continue;
+ }
+
slon_mkquery(&query,
"select ssy_seqno "
" from \"_%s\".sl_setsync "
@@ -2920,7 +2952,12 @@
int use_node;
SlonikAdmInfo *use_adminfo;
- if (setinfo[i].max_node == NULL)
+ if (setinfo[i].num_directsub <= 1)
+ {
+ use_node = stmt->backup_node;
+ use_adminfo = adminfo1;
+ }
+ else if (setinfo[i].max_node == NULL)
{
printf("no setsync status for set %d found at all\n",
setinfo[i].set_id);
- Previous message: [Slony1-commit] By cbbrowne: The query looking at copyFields() return codes was looking
- Next message: [Slony1-commit] By cbbrowne: At start of COPY_SET, we run through all the tables to "see
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list