check_unconfirmed_log()

8.32. check_unconfirmed_log()

Function Properties

Language: PLPGSQL

Return Type: boolean

declare
	v_rc		bool = false;
	v_error		bool = false;
	v_origin	integer;
	v_allconf	bigint;
	v_allsnap	txid_snapshot;
	v_count		bigint;
begin
	--
	-- Loop over all nodes that are the origin of at least one set
	--
	for v_origin in select distinct set_origin as no_id
			from sl_set loop
		--
		-- Per origin determine which is the highest event seqno
		-- that is confirmed by all subscribers to any of the
		-- origins sets.
		--
		select into v_allconf min(max_seqno) from (
				select con_received, max(con_seqno) as max_seqno
					from sl_confirm
					where con_origin = v_origin
					and con_received in (
						select distinct sub_receiver
							from sl_set as SET,
								sl_subscribe as SUB
							where SET.set_id = SUB.sub_set
							and SET.set_origin = v_origin
						)
					group by con_received
			) as maxconfirmed;
		if not found then
			raise NOTICE 'check_unconfirmed_log(): cannot determine highest ev_seqno for node % confirmed by all subscribers', v_origin;
			v_error = true;
			continue;
		end if;

		--
		-- Get the txid snapshot that corresponds with that event
		--
		select into v_allsnap ev_snapshot
			from sl_event
			where ev_origin = v_origin
			and ev_seqno = v_allconf;
		if not found then
			raise NOTICE 'check_unconfirmed_log(): cannot find event %,% in sl_event', v_origin, v_allconf;
			v_error = true;
			continue;
		end if;

		--
		-- Count the number of log rows that appeard after that event.
		--
		select into v_count count(*) from (
			select 1 from sl_log_1
				where log_origin = v_origin
				and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap)
			union all
			select 1 from sl_log_1
				where log_origin = v_origin
				and log_txid in (
					select * from "pg_catalog".txid_snapshot_xip(v_allsnap)
				)
			union all
			select 1 from sl_log_2
				where log_origin = v_origin
				and log_txid >= "pg_catalog".txid_snapshot_xmax(v_allsnap)
			union all
			select 1 from sl_log_2
				where log_origin = v_origin
				and log_txid in (
					select * from "pg_catalog".txid_snapshot_xip(v_allsnap)
				)
		) as cnt;

		if v_count > 0 then
			raise NOTICE 'check_unconfirmed_log(): origin % has % log rows that have not propagated to all subscribers yet', v_origin, v_count;
			v_rc = true;
		end if;
	end loop;

	if v_error then
		raise EXCEPTION 'check_unconfirmed_log(): aborting due to previous inconsistency';
	end if;

	return v_rc;
end;