subscribeset_int(p_omit_copy integer, p_sub_forward integer, p_sub_receiver integer, p_sub_provider boolean, p_sub_set boolean)

8.128. subscribeset_int(p_omit_copy integer, p_sub_forward integer, p_sub_receiver integer, p_sub_provider boolean, p_sub_set boolean)

Function Properties

Language: PLPGSQL

Return Type: integer

subscribeSet_int (sub_set, sub_provider, sub_receiver, sub_forward, omit_copy) Internal actions for subscribing receiver sub_receiver to subscription set sub_set.

declare
	v_set_origin		int4;
	v_sub_row			record;
	v_seq_id			bigint;
begin
	-- ----
	-- Grab the central configuration lock
	-- ----
	lock table sl_config_lock;

	-- ----
	-- Lookup the set origin
	-- ----
	select set_origin into v_set_origin
			from sl_set
			where set_id = p_sub_set;
	if not found then
		raise exception 'Slony-I: subscribeSet_int(): set % not found', p_sub_set;
	end if;

	-- ----
	-- Provider change is only allowed for active sets
	-- ----
	if p_sub_receiver = getLocalNodeId('_schemadoc') then
		select sub_active into v_sub_row from sl_subscribe
				where sub_set = p_sub_set
				and sub_receiver = p_sub_receiver;
		if found then
			if not v_sub_row.sub_active then
				raise exception 'Slony-I: subscribeSet_int(): set % is not active, cannot change provider',
						p_sub_set;
			end if;
		end if;
	end if;

	-- ----
	-- Try to change provider and/or forward for an existing subscription
	-- ----
	update sl_subscribe
			set sub_provider = p_sub_provider,
				sub_forward = p_sub_forward
			where sub_set = p_sub_set
			and sub_receiver = p_sub_receiver;
	if found then
	  
		-- ----
		-- This is changing a subscriptoin. Make sure all sets from
		-- this origin are subscribed using the same data provider.
		-- For this we first check that the requested data provider
		-- is subscribed to all the sets, the receiver is subscribed to.
		-- ----
		for v_sub_row in select set_id from sl_set
				join sl_subscribe on set_id = sub_set
				where set_origin = v_set_origin
				and sub_receiver = p_sub_receiver
				and sub_set <> p_sub_set
		loop
			if not exists (select 1 from sl_subscribe
					where sub_set = v_sub_row.set_id
					and sub_receiver = p_sub_provider
					and sub_active and sub_forward)
				and not exists (select 1 from sl_set
					where set_id = v_sub_row.set_id
					and set_origin = p_sub_provider)
			then
				raise exception 'Slony-I: subscribeSet_int(): node % is not a forwarding subscriber for set %',
						p_sub_provider, v_sub_row.set_id;
			end if;

			-- ----
			-- New data provider offers this set as well, change that
			-- subscription too.
			-- ----
			update sl_subscribe
					set sub_provider = p_sub_provider
					where sub_set = v_sub_row.set_id
					and sub_receiver = p_sub_receiver;
		end loop;

		-- ----
		-- Rewrite sl_listen table
		-- ----
		perform RebuildListenEntries();

		return p_sub_set;
	end if;

	-- ----
	-- Not found, insert a new one
	-- ----
	if not exists (select true from sl_path
			where pa_server = p_sub_provider
			and pa_client = p_sub_receiver)
	then
		insert into sl_path
				(pa_server, pa_client, pa_conninfo, pa_connretry)
				values 
				(p_sub_provider, p_sub_receiver, 
				'<event pending>', 10);
	end if;
	insert into sl_subscribe
			(sub_set, sub_provider, sub_receiver, sub_forward, sub_active)
			values (p_sub_set, p_sub_provider, p_sub_receiver,
				p_sub_forward, false);

	-- ----
	-- If the set origin is here, then enable the subscription
	-- ----
	if v_set_origin = getLocalNodeId('_schemadoc') then
		select createEvent('_schemadoc', 'ENABLE_SUBSCRIPTION', 
				p_sub_set::text, p_sub_provider::text, p_sub_receiver::text, 
				case p_sub_forward when true then 't' else 'f' end,
				case p_omit_copy when true then 't' else 'f' end
				) into v_seq_id;
		perform enableSubscription(p_sub_set, 
				p_sub_provider, p_sub_receiver);
	end if;
	
	-- ----
	-- Rewrite sl_listen table
	-- ----
	perform RebuildListenEntries();

	return p_sub_set;
end;