CVS User Account cvsuser
Thu Oct 27 20:20:46 PDT 2005
Log Message:
-----------
Forward patching all the fixes for 1.1.2 into HEAD.

Jan

Modified Files:
--------------
    slony1-engine/src/backend:
        slony1_funcs.sql (r1.69 -> r1.70)
    slony1-engine/src/ducttape:
        compare_pgbench_dumps (r1.3 -> r1.4)
    slony1-engine/src/slon:
        remote_worker.c (r1.92 -> r1.93)
    slony1-engine/src/slonik:
        slonik.c (r1.48 -> r1.49)

-------------- next part --------------
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.69
retrieving revision 1.70
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.69 -r1.70
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -940,44 +940,6 @@
 	perform @NAMESPACE at .terminateNodeConnections(
 			''_ at CLUSTERNAME@_Node_'' || p_failed_node);
 
--- Note that the following code should all become obsolete in the wake
--- of the availability of RebuildListenEntries()...
-
-if false then
-	-- ----
-	-- Let every node that listens for something on the failed node
-	-- listen for that on the backup node instead.
-	-- ----
-	for v_row in select * from @NAMESPACE at .sl_listen
-			where li_provider = p_failed_node
-				and li_receiver <> p_backup_node
-	loop
-		perform @NAMESPACE at .storeListen_int(v_row.li_origin,
-				p_backup_node, v_row.li_receiver);
-	end loop;
-
-	-- ----
-	-- Let the backup node listen for all events where the
-	-- failed node did listen for it.
-	-- ----
-	for v_row in select li_origin, li_provider
-			from @NAMESPACE at .sl_listen
-			where li_receiver = p_failed_node
-				and li_provider <> p_backup_node
-	loop
-		perform @NAMESPACE at .storeListen_int(v_row.li_origin,
-				v_row.li_provider, p_backup_node);
-	end loop;
-
-	-- ----
-	-- Remove all sl_listen entries that receive anything from the
-	-- failed node.
-	-- ----
-	delete from @NAMESPACE at .sl_listen
-			where li_provider = p_failed_node
-				or li_receiver = p_failed_node;
-end if;
-
 	-- ----
 	-- Move the sets
 	-- ----
@@ -1009,12 +971,10 @@
 				loop
 					perform @NAMESPACE at .alterTableRestore(v_row2.tab_id);
 				end loop;
-			end if;
 
 			update @NAMESPACE at .sl_set set set_origin = p_backup_node
 					where set_id = v_row.set_id;
 
-			if p_backup_node = @NAMESPACE at .getLocalNodeId(''_ at CLUSTERNAME@'') then
 				delete from @NAMESPACE at .sl_setsync
 						where ssy_setid = v_row.set_id;
 
@@ -1135,7 +1095,7 @@
 'FUNCTION failedNode2 (failed_node, backup_node, set_id, ev_seqno, ev_seqfake)
 
 On the node that has the highest sequence number of the failed node,
-fake the FAILED_NODE event.';
+fake the FAILOVER_SET event.';
 
 -- ----------------------------------------------------------------------
 -- FUNCTION failoverSet_int (failed_node, backup_node, set_id)
@@ -1183,6 +1143,15 @@
 		loop
 			perform @NAMESPACE at .alterTableForReplication(v_row.tab_id);
 		end loop;
+		insert into @NAMESPACE at .sl_event
+				(ev_origin, ev_seqno, ev_timestamp,
+				ev_minxid, ev_maxxid, ev_xip,
+				ev_type, ev_data1, ev_data2, ev_data3)
+				values
+				(p_backup_node, "pg_catalog".nextval(''@NAMESPACE at .sl_event_seq''), CURRENT_TIMESTAMP,
+				''0'', ''0'', '''',
+				''ACCEPT_SET'', p_set_id::text,
+				p_failed_node::text, p_backup_node::text);
 	else
 		delete from @NAMESPACE at .sl_subscribe
 				where sub_set = p_set_id
@@ -3483,10 +3452,9 @@
 	end if;
 
 	-- ----
-	-- Restore all original triggers and rules
+	-- Restore all original triggers and rules of all sets
 	-- ----
 	for v_row in select * from @NAMESPACE at .sl_table
-			where tab_set = p_set_id
 	loop
 		perform @NAMESPACE at .alterTableRestore(v_row.tab_id);
 	end loop;
@@ -3500,7 +3468,6 @@
 	-- Put all tables back into replicated mode
 	-- ----
 	for v_row in select * from @NAMESPACE at .sl_table
-			where tab_set = p_set_id
 	loop
 		perform @NAMESPACE at .alterTableForReplication(v_row.tab_id);
 	end loop;
@@ -5104,7 +5071,7 @@
         p_old   alias for $1;
 begin
 	-- upgrade sl_table
-	if p_old = ''1.0.2'' or p_old = ''1.0.5'' then
+	if p_old IN (''1.0.2'', ''1.0.5'', ''1.0.6'') then
 		-- Add new column(s) sl_table.tab_relname, sl_table.tab_nspname
 		execute ''alter table @NAMESPACE at .sl_table add column tab_relname name'';
 		execute ''alter table @NAMESPACE at .sl_table add column tab_nspname name'';
@@ -5123,7 +5090,7 @@
 	end if;
 
 	-- upgrade sl_sequence
-	if p_old = ''1.0.2'' or p_old = ''1.0.5'' then
+	if p_old IN (''1.0.2'', ''1.0.5'', ''1.0.6'') then
 		-- Add new column(s) sl_sequence.seq_relname, sl_sequence.seq_nspname
 		execute ''alter table @NAMESPACE at .sl_sequence add column seq_relname name'';
 		execute ''alter table @NAMESPACE at .sl_sequence add column seq_nspname name'';
@@ -5143,7 +5110,7 @@
 	-- ----
 	-- Changes from 1.0.x to 1.1.0
 	-- ----
-	if p_old = ''1.0.2'' or p_old = ''1.0.5'' then
+	if p_old IN (''1.0.2'', ''1.0.5'', ''1.0.6'') then
 		-- Add new column sl_node.no_spool for virtual spool nodes
 		execute ''alter table @NAMESPACE at .sl_node add column no_spool boolean'';
 		update @NAMESPACE at .sl_node set no_spool = false;
Index: compare_pgbench_dumps
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/compare_pgbench_dumps,v
retrieving revision 1.3
retrieving revision 1.4
diff -Lsrc/ducttape/compare_pgbench_dumps -Lsrc/ducttape/compare_pgbench_dumps -u -w -r1.3 -r1.4
--- src/ducttape/compare_pgbench_dumps
+++ src/ducttape/compare_pgbench_dumps
@@ -8,7 +8,7 @@
 DB2=$2    # Second database
 
 echo -n "**** comparing databases $DB1 and $DB2 ... "
-psql $DB1 -o dump.tmp.1.$$ <<_EOF_
+psql -o dump.tmp.1.$$ $DB1 <<_EOF_
 	select 'accounts:'::text, aid, bid, abalance, filler
 			from accounts order by aid;
 	select 'branches:'::text, bid, bbalance, filler
@@ -18,7 +18,7 @@
 	select 'history:'::text, tid, bid, aid, delta, mtime, filler,
 			"_Slony-I_T1_rowID" from history order by "_Slony-I_T1_rowID";
 _EOF_
-psql $DB2 -o dump.tmp.2.$$ <<_EOF_
+psql -o dump.tmp.2.$$ $DB2 <<_EOF_
 	select 'accounts:'::text, aid, bid, abalance, filler
 			from accounts order by aid;
 	select 'branches:'::text, bid, bbalance, filler
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.92
retrieving revision 1.93
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.92 -r1.93
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -1061,7 +1061,7 @@
 				slon_log(SLON_DEBUG2, "got parms ACCEPT_SET\n");
 				
 			    /* If we're a remote node, and haven't yet
-			     * received the MOVE_SET event from the
+			     * received the MOVE/FAILOVER_SET event from the
 			     * new origin, then we'll need to sleep a
 			     * bit...  This avoids a race condition
 			     * where new SYNCs take place on the new
@@ -1071,49 +1071,50 @@
 			     * received and processed  */
 
 			    if ((rtcfg_nodeid != old_origin) && (rtcfg_nodeid != new_origin)) {
-				    slon_log(SLON_DEBUG2, "ACCEPT_SET - node not origin - wait...\n");
-				    slon_mkquery(&query1, 
-						 "select 1 from %s.sl_event accept "
-						 "where "
-						 "   accept.ev_type = 'ACCEPT_SET' and "
-						 "   accept.ev_origin = %d and "
-						 "   accept.ev_data1 = %d and "
-						 "   accept.ev_data2 = %d and "
-						 "   accept.ev_data3 = %d and "
-						 "   not exists  "
-						 "   (select 1 from %s.sl_event move "
+				    slon_log(SLON_DEBUG2, "ACCEPT_SET - node not origin\n");
+				    slon_mkquery(&query2, 
+						 "select 1 from %s.sl_event "
 						 "    where "
-						 "      accept.ev_origin = move.ev_data3 and "
-						 "      move.ev_type = 'MOVE_SET' and "
-						 "      move.ev_data1 = accept.ev_data1 and "
-						 "      move.ev_data2 = accept.ev_data2 and "
-						 "      move.ev_data3 = accept.ev_data3); ",
+						 "     (ev_origin = %d and "
+						 "      ev_type = 'MOVE_SET' and "
+						 "      ev_data1 = '%d' and "
+						 "      ev_data2 = '%d' and "
+						 "      ev_data3 = '%d') "
+						 "or "
+						 "     (ev_origin = %d and "
+						 "      ev_type = 'FAILOVER_SET' and "
+						 "      ev_data1 = '%d' and "
+						 "      ev_data2 = '%d' and "
+						 "      ev_data3 = '%d'); ",
 						 
 						 rtcfg_namespace, 
 						 old_origin, set_id, old_origin, new_origin,
-						 rtcfg_namespace);
-				    res = PQexec(local_dbconn, dstring_data(&query1));
-				    while (PQntuples(res) > 0) {
-					    int sleeptime = 15;
-					    int sched_rc;
-					    slon_log(SLON_WARN, "remoteWorkerThread_%d: "
-					     "accept set: node has not yet received MOVE_SET event "
-						     "for set %d old origin %d new origin - sleep %d seconds\n",
-						     rtcfg_nodeid, set_id, old_origin, new_origin, sleeptime);
-					    sched_rc = sched_msleep(node, sleeptime * 1000);
-					    if (sched_rc != SCHED_STATUS_OK) {
-						    event_ok = false;
-						    break;
-					    } else {
-						    if (sleeptime < 60)
-							    sleeptime *= 2;
-					    }
-					    res = PQexec(local_dbconn, dstring_data(&query1));
+						 old_origin, old_origin, new_origin, set_id);
+
+				    res = PQexec(local_dbconn, dstring_data(&query2));
+					while (PQntuples(res) == 0)
+					{
+						slon_log(SLON_DEBUG2, "ACCEPT_SET - MOVE_SET or FAILOVER_SET not received yet - sleep\n");
+						if (sched_msleep(node, 10000) != SCHED_STATUS_OK)
+							slon_abort();
+						PQclear(res);
+						res = PQexec(local_dbconn, dstring_data(&query2));
 				    }
+					PQclear(res);
+					slon_log(SLON_DEBUG2, "ACCEPT_SET - MOVE_SET or FAILOVER_SET exists - done\n");
+
+					slon_appendquery(&query1,
+									 "notify \"_%s_Restart\"; ",
+									 rtcfg_cluster_name);
+					query_append_event(&query1, event);
+					slon_appendquery(&query1, "commit transaction;");
+					query_execute(node, local_dbconn, &query1);
+					slon_abort();
+
+					need_reloadListen = true;
 			    } else {
 				    slon_log(SLON_DEBUG2, "ACCEPT_SET - on origin node...\n");
 			    }
-			    slon_log(SLON_DEBUG2, "ACCEPT_SET - done...\n");
 
 			}
 			else if (strcmp(event->ev_type, "MOVE_SET") == 0)
@@ -1292,13 +1293,12 @@
 						 * Data copy for new enabled set has failed. Rollback
 						 * the transaction, sleep and try again.
 						 */
-						if (query_execute(node, local_dbconn, &query2) < 0)
-							slon_abort();
-
 						slon_log(SLON_WARN, "remoteWorkerThread_%d: "
 								 "data copy for set %d failed - "
 								 "sleep %d seconds\n",
 								 node->no_id, sub_set, sleeptime);
+						if (query_execute(node, local_dbconn, &query2) < 0)
+							slon_abort();
 						sched_rc = sched_msleep(node, sleeptime * 1000);
 						if (sched_rc != SCHED_STATUS_OK)
 						{
@@ -2461,6 +2461,7 @@
 	pro_dbconn = pro_conn->dbconn;
 	loc_dbconn = local_conn->dbconn;
 	dstring_init(&query1);
+	dstring_init(&query2);
 	dstring_init(&query3);
 	dstring_init(&indexregenquery);
 	sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
@@ -2478,6 +2479,9 @@
 				 node->no_id, archive_tmp, strerror(errno));
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2488,6 +2492,9 @@
 				 node->no_id, archive_tmp, strerror(errno));
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2503,6 +2510,9 @@
 	{
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		dstring_free(&query2);
+		dstring_free(&query3);
+		dstring_free(&indexregenquery);
 		terminate_log_archive();
 		return -1;
 	}
@@ -2533,6 +2543,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2544,6 +2557,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2558,6 +2574,9 @@
 		{
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2597,6 +2616,9 @@
 		PQclear(res1);
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		dstring_free(&query2);
+		dstring_free(&query3);
+		dstring_free(&indexregenquery);
 		terminate_log_archive();
 		return -1;
 	}
@@ -2631,6 +2653,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2655,6 +2680,9 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				dstring_free(&query2);
+				dstring_free(&query3);
+				dstring_free(&indexregenquery);
 				terminate_log_archive();
 				return -1;
 			}
@@ -2706,6 +2734,9 @@
 		PQclear(res1);
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		dstring_free(&query2);
+		dstring_free(&query3);
+		dstring_free(&indexregenquery);
 		terminate_log_archive();
 		return -1;
 	}
@@ -2729,6 +2760,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2763,6 +2797,9 @@
 		PQclear(res1);
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		dstring_free(&query2);
+		dstring_free(&query3);
+		dstring_free(&indexregenquery);
 		terminate_log_archive();
 		return -1;
 	}
@@ -2801,6 +2838,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2825,6 +2865,9 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				dstring_free(&query2);
+				dstring_free(&query3);
+				dstring_free(&indexregenquery);
 				terminate_log_archive();
 				return -1;
 			}
@@ -2846,6 +2889,9 @@
 					PQclear(res1);
 					slon_disconnectdb(pro_conn);
 					dstring_free(&query1);
+					dstring_free(&query2);
+					dstring_free(&query3);
+					dstring_free(&indexregenquery);
 					terminate_log_archive();
 					return -1;
 				}
@@ -2882,6 +2928,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2903,6 +2952,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2918,6 +2970,9 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				dstring_free(&query2);
+				dstring_free(&query3);
+				dstring_free(&indexregenquery);
 				terminate_log_archive();
 				return -1;
 			}
@@ -2932,7 +2987,6 @@
 			 "Begin COPY of table %s\n",
 			 node->no_id, tab_fqname);
 
-		dstring_init(&query2);
 		slon_mkquery(&query2, "select %s.copyFields(%d);",
 			     rtcfg_namespace, tab_id);
 
@@ -2946,6 +3000,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2964,6 +3021,8 @@
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
 			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -2993,6 +3052,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3008,6 +3070,9 @@
 
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				dstring_free(&query2);
+				dstring_free(&query3);
+				dstring_free(&indexregenquery);
 				terminate_log_archive();
 				return -1;
 			}
@@ -3036,6 +3101,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3064,6 +3132,9 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				dstring_free(&query2);
+				dstring_free(&query3);
+				dstring_free(&indexregenquery);
 				terminate_log_archive();
 				return -1;
 			}
@@ -3083,6 +3154,9 @@
 					PQclear(res1);
 					slon_disconnectdb(pro_conn);
 					dstring_free(&query1);
+					dstring_free(&query2);
+					dstring_free(&query3);
+					dstring_free(&indexregenquery);
 					terminate_log_archive();
 					return -1;
 					
@@ -3104,6 +3178,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3125,6 +3202,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3142,6 +3222,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3156,6 +3239,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3214,6 +3300,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3236,6 +3325,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3255,6 +3347,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3273,64 +3368,6 @@
 	gettimeofday(&tv_start2, NULL);
 
 	/*
-	 * Copy the sequences contained in the set
-	 */
-
-	/* The copy of sequences is being done earlier, before we
-	 * start doing tables, so that if anything is missing, that is
-	 * noticed BEFORE 8 hours of copying of data takes place... */
-
-/* 	slon_mkquery(&query1, */
-/* 		     "select SQ.seq_id, " */
-/* 		     "		\"pg_catalog\".quote_ident(PGN.nspname) || '.' || " */
-/* 		     "		\"pg_catalog\".quote_ident(PGC.relname), " */
-/* 		     "		SQ.seq_comment " */
-/* 		     "	from %s.sl_sequence SQ, " */
-/* 		     "		\"pg_catalog\".pg_class PGC, " */
-/* 		     "		\"pg_catalog\".pg_namespace PGN " */
-/* 		     "	where SQ.seq_set = %d " */
-/* 		     "		and PGC.oid = SQ.seq_reloid " */
-/* 		     "		and PGN.oid = PGC.relnamespace; ", */
-/* 		     rtcfg_namespace, set_id); */
-/* 	res1 = PQexec(pro_dbconn, dstring_data(&query1)); */
-/* 	if (PQresultStatus(res1) != PGRES_TUPLES_OK) */
-/* 	{ */
-/* 		slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", */
-/* 			 node->no_id, dstring_data(&query1), */
-/* 			 PQresultErrorMessage(res1)); */
-/* 		PQclear(res1); */
-/* 		slon_disconnectdb(pro_conn); */
-/* 		dstring_free(&query1); */
-/* 		terminate_log_archive(); */
-/* 		return -1; */
-/* 	} */
-/* 	ntuples1 = PQntuples(res1); */
-/* 	for (tupno1 = 0; tupno1 < ntuples1; tupno1++) */
-/* 	{ */
-/* 		char	   *seq_id = PQgetvalue(res1, tupno1, 0); */
-/* 		char	   *seq_fqname = PQgetvalue(res1, tupno1, 1); */
-/* 		char	   *seq_comment = PQgetvalue(res1, tupno1, 2); */
-
-/* 		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " */
-/* 			 "copy sequence %s\n", */
-/* 			 node->no_id, seq_fqname); */
-
-/* 		slon_mkquery(&query1, */
-/* 			     "select %s.setAddSequence_int(%d, %s, '%q', '%q')", */
-/* 			     rtcfg_namespace, set_id, seq_id, */
-/* 			     seq_fqname, seq_comment); */
-/* 		if (query_execute(node, loc_dbconn, &query1) < 0) */
-/* 		{ */
-/* 			PQclear(res1); */
-/* 			slon_disconnectdb(pro_conn); */
-/* 			dstring_free(&query1); */
-/* 			terminate_log_archive(); */
-/* 			return -1; */
-/* 		} */
-/* 	} */
-/* 	PQclear(res1); */
-
-	/*
 	 * And copy over the sequence last_value corresponding to the
 	 * ENABLE_SUBSCRIPTION event.
 	 */
@@ -3359,6 +3396,9 @@
 		PQclear(res1);
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		dstring_free(&query2);
+		dstring_free(&query3);
+		dstring_free(&indexregenquery);
 		terminate_log_archive();
 		return -1;
 	}
@@ -3402,6 +3442,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3446,6 +3489,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3457,6 +3503,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3504,6 +3553,9 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				dstring_free(&query2);
+				dstring_free(&query3);
+				dstring_free(&indexregenquery);
 				terminate_log_archive();
 				return -1;
 			}
@@ -3515,6 +3567,9 @@
 				PQclear(res1);
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
+				dstring_free(&query2);
+				dstring_free(&query3);
+				dstring_free(&indexregenquery);
 				terminate_log_archive();
 				return -1;
 			}
@@ -3542,7 +3597,6 @@
 				     "from %s.sl_log_2 where log_origin = %d and %s; ",
 				     rtcfg_namespace, node->no_id, dstring_data(&query2),
 				     rtcfg_namespace, node->no_id, dstring_data(&query2));
-			dstring_free(&query2);
 		}
 
 		/*
@@ -3559,6 +3613,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3600,6 +3657,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3611,6 +3671,9 @@
 			PQclear(res1);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3640,6 +3703,9 @@
 	{
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		dstring_free(&query2);
+		dstring_free(&query3);
+		dstring_free(&indexregenquery);
 		terminate_log_archive();
 		return -1;
 	}
@@ -3655,6 +3721,9 @@
 				 node->no_id);
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3673,6 +3742,9 @@
 				 node->no_id, archive_tmp, strerror(errno));
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
+			dstring_free(&query2);
+			dstring_free(&query3);
+			dstring_free(&indexregenquery);
 			terminate_log_archive();
 			return -1;
 		}
@@ -3688,11 +3760,17 @@
 	{
 		slon_disconnectdb(pro_conn);
 		dstring_free(&query1);
+		dstring_free(&query2);
+		dstring_free(&query3);
+		dstring_free(&indexregenquery);
 		terminate_log_archive();
 		return -1;
 	}
 	slon_disconnectdb(pro_conn);
 	dstring_free(&query1);
+	dstring_free(&query2);
+	dstring_free(&query3);
+	dstring_free(&indexregenquery);
 
 	slon_log(SLON_DEBUG1, "remoteWorkerThread_%d: "
 		 "disconnected from provider DB\n",
@@ -3768,6 +3846,15 @@
 	 */
 	for (provider = wd->provider_head; provider; provider = provider->next)
 	{
+		if (provider->conn != NULL)
+		{
+			if (PQstatus(provider->conn->dbconn) != CONNECTION_OK)
+			{
+				slon_disconnectdb(provider->conn);
+				provider->conn = NULL;
+			}
+		}
+
 		if (provider->conn == NULL)
 		{
 			if (provider->pa_conninfo == NULL)
@@ -3878,6 +3965,7 @@
 		PGresult   *res2;
 		int			ntuples2;
 		int			tupno2;
+		int			ntables_total = 0;
 		int			added_or_to_provider = 0;
 
 		provider_qual = &(provider->helper_qualification);
@@ -3972,8 +4060,16 @@
 			if (ntuples2 == 0)
 			{
 				PQclear(res2);
+
+				if (!added_or_to_provider)
+				{
+					slon_appendquery(provider_qual, " ( false ) ");
+					added_or_to_provider = 1;
+				}
+
 				continue;
 			}
+			ntables_total += ntuples2;
 
 			/*
 			 * ... and build up a query qualification that is
Index: slonik.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slonik/slonik.c,v
retrieving revision 1.48
retrieving revision 1.49
diff -Lsrc/slonik/slonik.c -Lsrc/slonik/slonik.c -u -w -r1.48 -r1.49
--- src/slonik/slonik.c
+++ src/slonik/slonik.c
@@ -2521,6 +2521,7 @@
 typedef struct
 {
 	int			set_id;
+	int			num_directsub;
 	int			num_subscribers;
 	failnode_node **subscribers;
 	failnode_node *max_node;
@@ -2592,8 +2593,7 @@
 				 "    and S.set_origin = %d "
 				 "    and SUB.sub_provider = %d "
 				 "    and SUB.sub_active "
-				 "    group by set_id "
-				 "    having count(S.set_id) > 1",
+				 "    group by set_id ",
 				 stmt->hdr.script->clustername,
 				 stmt->hdr.script->clustername,
 				 stmt->no_id, stmt->no_id);
@@ -2681,6 +2681,10 @@
 	for (i = 0; i < num_sets; i++)
 	{
 		setinfo[i].set_id = (int)strtol(PQgetvalue(res2, i, 0), NULL, 10);
+		setinfo[i].num_directsub = (int)strtol(PQgetvalue(res2, i, 1), NULL, 10);
+
+		if (setinfo[i].num_directsub <= 1)
+			continue;
 
 		slon_mkquery(&query,
 					 "select sub_receiver "
@@ -2857,6 +2861,34 @@
 		setinfo[i].max_node = NULL;
 		setinfo[i].max_seqno = 0;
 
+		if (setinfo[i].num_directsub <= 1)
+		{
+			int64		ev_seqno;
+
+			slon_mkquery(&query,
+					"select max(ev_seqno) "
+					"	from \"_%s\".sl_event "
+					"	where ev_origin = %d "
+					"	and ev_type = 'SYNC'; ",
+					stmt->hdr.script->clustername,
+					stmt->no_id);
+			res1 = db_exec_select((SlonikStmt *) stmt,
+					adminfo1, &query);
+			if (res1 == NULL)
+			{
+				free(configbuf);
+				dstring_free(&query);
+				return -1;
+			}
+			slon_scanint64(PQgetvalue(res1, 0, 0), &ev_seqno);
+
+			setinfo[i].max_seqno = ev_seqno;
+
+			PQclear(res1);
+
+			continue;
+		}
+
 		slon_mkquery(&query,
 					 "select ssy_seqno "
 					 "    from \"_%s\".sl_setsync "
@@ -2920,7 +2952,12 @@
 		int			use_node;
 		SlonikAdmInfo *use_adminfo;
 
-		if (setinfo[i].max_node == NULL)
+		if (setinfo[i].num_directsub <= 1)
+		{
+			use_node = stmt->backup_node;
+			use_adminfo = adminfo1;
+		}
+		else if (setinfo[i].max_node == NULL)
 		{
 			printf("no setsync status for set %d found at all\n",
 				   setinfo[i].set_id);


More information about the Slony1-commit mailing list