Slony-I 2.2.10 Documentation | ||||
---|---|---|---|---|
Prev | Fast Backward | Chapter 8. Schema schemadoc | Fast Forward | Next |
8.138. upgradeschema(p_old text)
Function Properties
Language: PLPGSQL
Return Type: text
Called during "update functions" by slonik to perform schema changesdeclare v_tab_row record; v_query text; v_keepstatus text; begin -- If old version is pre-2.0, then we require a special upgrade process if p_old like '1.%' then raise exception 'Upgrading to Slony-I 2.x requires running slony_upgrade_20'; end if; perform upgradeSchemaAddTruncateTriggers(); -- Change all Slony-I-defined columns that are "timestamp without time zone" to "timestamp *WITH* time zone" if exists (select 1 from information_schema.columns c where table_schema = '_schemadoc' and data_type = 'timestamp without time zone' and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') and (c.table_name, c.column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp'))) then -- Preserve sl_status select pg_get_viewdef('sl_status') into v_keepstatus; execute 'drop view sl_status'; for v_tab_row in select table_schema, table_name, column_name from information_schema.columns c where table_schema = '_schemadoc' and data_type = 'timestamp without time zone' and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE') and (table_name, column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp')) loop raise notice 'Changing Slony-I column [%.%] to timestamp WITH time zone', v_tab_row.table_name, v_tab_row.column_name; v_query := 'alter table ' || slon_quote_brute(v_tab_row.table_schema) || '.' || v_tab_row.table_name || ' alter column ' || v_tab_row.column_name || ' type timestamp with time zone;'; execute v_query; end loop; -- restore sl_status execute 'create view sl_status as ' || v_keepstatus; end if; if not exists (select 1 from information_schema.tables where table_schema = '_schemadoc' and table_name = 'sl_components') then v_query := ' create table sl_components ( co_actor text not null primary key, co_pid integer not null, co_node integer not null, co_connection_pid integer not null, co_activity text, co_starttime timestamptz not null, co_event bigint, co_eventtype text ) without oids; '; execute v_query; end if; if not exists (select 1 from information_schema.tables t where table_schema = '_schemadoc' and table_name = 'sl_event_lock') then v_query := 'create table sl_event_lock (dummy integer);'; execute v_query; end if; if not exists (select 1 from information_schema.tables t where table_schema = '_schemadoc' and table_name = 'sl_apply_stats') then v_query := ' create table sl_apply_stats ( as_origin int4, as_num_insert int8, as_num_update int8, as_num_delete int8, as_num_truncate int8, as_num_script int8, as_num_total int8, as_duration interval, as_apply_first timestamptz, as_apply_last timestamptz, as_cache_prepare int8, as_cache_hit int8, as_cache_evict int8, as_cache_prepare_max int8 ) WITHOUT OIDS;'; execute v_query; end if; -- -- On the upgrade to 2.2, we change the layout of sl_log_N by -- adding columns log_tablenspname, log_tablerelname, and -- log_cmdupdncols as well as changing log_cmddata into -- log_cmdargs, which is a text array. -- if not check_table_field_exists('_schemadoc', 'sl_log_1', 'log_cmdargs') then -- -- Check that the cluster is completely caught up -- if check_unconfirmed_log() then raise EXCEPTION 'cannot upgrade to new sl_log_N format due to existing unreplicated data'; end if; -- -- Drop tables sl_log_1 and sl_log_2 -- drop table sl_log_1; drop table sl_log_2; -- -- Create the new sl_log_1 -- create table sl_log_1 ( log_origin int4, log_txid bigint, log_tableid int4, log_actionseq int8, log_tablenspname text, log_tablerelname text, log_cmdtype "char", log_cmdupdncols int4, log_cmdargs text[] ) without oids; create index sl_log_1_idx1 on sl_log_1 (log_origin, log_txid, log_actionseq); comment on table sl_log_1 is 'Stores each change to be propagated to subscriber nodes'; comment on column sl_log_1.log_origin is 'Origin node from which the change came'; comment on column sl_log_1.log_txid is 'Transaction ID on the origin node'; comment on column sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; comment on column sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas'; comment on column sl_log_1.log_tablenspname is 'The schema name of the table affected'; comment on column sl_log_1.log_tablerelname is 'The table name of the table affected'; comment on column sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; comment on column sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; comment on column sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica'; -- -- Create the new sl_log_2 -- create table sl_log_2 ( log_origin int4, log_txid bigint, log_tableid int4, log_actionseq int8, log_tablenspname text, log_tablerelname text, log_cmdtype "char", log_cmdupdncols int4, log_cmdargs text[] ) without oids; create index sl_log_2_idx1 on sl_log_2 (log_origin, log_txid, log_actionseq); comment on table sl_log_2 is 'Stores each change to be propagated to subscriber nodes'; comment on column sl_log_2.log_origin is 'Origin node from which the change came'; comment on column sl_log_2.log_txid is 'Transaction ID on the origin node'; comment on column sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect'; comment on column sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas'; comment on column sl_log_2.log_tablenspname is 'The schema name of the table affected'; comment on column sl_log_2.log_tablerelname is 'The table name of the table affected'; comment on column sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE'; comment on column sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs'; comment on column sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica'; create table sl_log_script ( log_origin int4, log_txid bigint, log_actionseq int8, log_cmdtype "char", log_cmdargs text[] ) WITHOUT OIDS; create index sl_log_script_idx1 on sl_log_script (log_origin, log_txid, log_actionseq); comment on table sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes'; comment on column sl_log_script.log_origin is 'Origin name from which the change came'; comment on column sl_log_script.log_txid is 'Transaction ID on the origin node'; comment on column sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas'; comment on column sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete'; comment on column sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.'; -- -- Put the log apply triggers back onto sl_log_1/2 -- create trigger apply_trigger before INSERT on sl_log_1 for each row execute procedure logApply('_schemadoc'); alter table sl_log_1 enable replica trigger apply_trigger; create trigger apply_trigger before INSERT on sl_log_2 for each row execute procedure logApply('_schemadoc'); alter table sl_log_2 enable replica trigger apply_trigger; end if; if not exists (select 1 from information_schema.routines where routine_schema = '_schemadoc' and routine_name = 'string_agg') then CREATE AGGREGATE string_agg(text) ( SFUNC=agg_text_sum, STYPE=text, INITCOND='' ); end if; if not exists (select 1 from information_schema.views where table_schema='_schemadoc' and table_name='sl_failover_targets') then create view sl_failover_targets as select set_id, set_origin as set_origin, sub1.sub_receiver as backup_id FROM sl_subscribe sub1 ,sl_set set1 where sub1.sub_set=set_id and sub1.sub_forward=true --exclude candidates where the set_origin --has a path a node but the failover --candidate has no path to that node and sub1.sub_receiver not in (select p1.pa_client from sl_path p1 left outer join sl_path p2 on (p2.pa_client=p1.pa_client and p2.pa_server=sub1.sub_receiver) where p2.pa_client is null and p1.pa_server=set_origin and p1.pa_client<>sub1.sub_receiver ) and sub1.sub_provider=set_origin --exclude any subscribers that are not --direct subscribers of all sets on the --origin and sub1.sub_receiver not in (select direct_recv.sub_receiver from (--all direct receivers of the first set select subs2.sub_receiver from sl_subscribe subs2 where subs2.sub_provider=set1.set_origin and subs2.sub_set=set1.set_id) as direct_recv inner join (--all other sets from the origin select set_id from sl_set set2 where set2.set_origin=set1.set_origin and set2.set_id<>sub1.sub_set) as othersets on(true) left outer join sl_subscribe subs3 on(subs3.sub_set=othersets.set_id and subs3.sub_forward=true and subs3.sub_provider=set1.set_origin and direct_recv.sub_receiver=subs3.sub_receiver) where subs3.sub_receiver is null ); end if; if not check_table_field_exists('_schemadoc', 'sl_node', 'no_failed') then alter table sl_node add column no_failed bool; update sl_node set no_failed=false; end if; return p_old; end;