Wed Oct 25 08:44:50 PDT 2006
- Previous message: [Slony1-commit] By wieck: Fixed archive log writing by moving global variables into the
- Next message: [Slony1-commit] By cbbrowne: Replace a CONTINUE statement that doesn't work in PG 7.4
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- Fixed archive log writing by moving global variables into the per node structure. Corrected node ids in the test_1_handover ducttape test and added handover scripts to tests 2 and 3. Modified Files: -------------- slony1-engine/src/ducttape: test_1_handover_to_1 (r1.1 -> r1.2) test_1_handover_to_2 (r1.1 -> r1.2) slony1-engine/src/slon: remote_worker.c (r1.126 -> r1.127) slon.h (r1.59 -> r1.60) Added Files: ----------- slony1-engine/src/ducttape: test_2_handover_to_1 (r1.2) test_2_handover_to_2 (r1.2) test_3_handover_to_1 (r1.2) test_3_handover_to_2 (r1.2) -------------- next part -------------- Index: test_1_handover_to_2 =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/test_1_handover_to_2,v retrieving revision 1.1 retrieving revision 1.2 diff -Lsrc/ducttape/test_1_handover_to_2 -Lsrc/ducttape/test_1_handover_to_2 -u -w -r1.1 -r1.2 --- src/ducttape/test_1_handover_to_2 +++ src/ducttape/test_1_handover_to_2 @@ -3,8 +3,8 @@ # ********** # test_1_handover_to_2 # -# Script to change the origin of set 1 from node 1 to node 2. -# This still requires that node 1 is alive. This is called +# Script to change the origin of set 1 from node 11 to node 22. +# This still requires that node 11 is alive. This is called # handover or move, not failover. # ********** @@ -20,10 +20,10 @@ echo "**** Move set 1 to node 2" slonik <<_EOF_ cluster name = T1; - node 1 admin conninfo = 'dbname=$DB1'; - node 2 admin conninfo = 'dbname=$DB2'; + node 11 admin conninfo = 'dbname=$DB1'; + node 22 admin conninfo = 'dbname=$DB2'; - lock set (id = 1, origin = 1); - move set (id = 1, old origin = 1, new origin = 2); + lock set (id = 1, origin = 11); + move set (id = 1, old origin = 11, new origin = 22); _EOF_ Index: test_1_handover_to_1 =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/test_1_handover_to_1,v retrieving revision 1.1 retrieving revision 1.2 diff -Lsrc/ducttape/test_1_handover_to_1 -Lsrc/ducttape/test_1_handover_to_1 -u -w -r1.1 -r1.2 --- src/ducttape/test_1_handover_to_1 +++ src/ducttape/test_1_handover_to_1 @@ -3,8 +3,8 @@ # ********** # test_1_handover_to_2 # -# Script to change the origin of set 1 from node 1 to node 2. -# This still requires that node 1 is alive. This is called +# Script to change the origin of set 1 from node 22 back to node 11. +# This still requires that both nodes are alive. This is called # handover or move, not failover. # ********** @@ -20,10 +20,10 @@ echo "**** Move set 1 to node 2" slonik <<_EOF_ cluster name = T1; - node 1 admin conninfo = 'dbname=$DB1'; - node 2 admin conninfo = 'dbname=$DB2'; + node 11 admin conninfo = 'dbname=$DB1'; + node 22 admin conninfo = 'dbname=$DB2'; - lock set (id = 1, origin = 2); - move set (id = 1, old origin = 2, new origin = 1); + lock set (id = 1, origin = 22); + move set (id = 1, old origin = 22, new origin = 11); _EOF_ Index: remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.126 retrieving revision 1.127 diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.126 -r1.127 --- src/slon/remote_worker.c +++ src/slon/remote_worker.c @@ -254,25 +254,21 @@ WorkerGroupData * wd, SlonWorkMsg_event * event); static void *sync_helper(void *cdata); -static char archive_name[SLON_MAX_PATH]; -static char archive_tmp[SLON_MAX_PATH]; -static FILE *archive_fp = NULL; -static int open_log_archive(int node_id, char *seqbuf); -static int close_log_archive(); -static void terminate_log_archive(); -static int generate_archive_header(int node_id, const char *seqbuf); -static int submit_query_to_archive(SlonDString * ds); -static int submit_string_to_archive(const char *s); -#ifndef HAVE_PQPUTCOPYDATA -static int submit_raw_data_to_archive(const char *s); -#endif -static int logarchive_tracking(const char *namespace, int sub_set, const char *firstseq, + +static int archive_open(SlonNode *node, char *seqbuf); +static int archive_close(SlonNode *node); +static void archive_terminate(SlonNode *node); +static int archive_append_ds(SlonNode *node, SlonDString * ds); +static int archive_append_str(SlonNode *node, const char *s); +static int archive_append_data(SlonNode *node, const char *s, int len); +static int archive_tracking(SlonNode *node, const char *namespace, + int sub_set, const char *firstseq, const char *seqbuf, const char *timestamp); -static int write_void_log(int node_id, char *seqbuf, const char *message); +static int archive_void_log(SlonNode *node, char *seqbuf, const char *message); + static void compress_actionseq(const char *ssy_actionseq, SlonDString * action_subquery); -#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive(); /* ---------- * slon_remoteWorkerThread @@ -645,14 +641,10 @@ need_reloadListen = true; if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_NODE"); + rc = archive_void_log(node, seqbuf, "-- STORE_NODE"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } - } } else if (strcmp(event->ev_type, "ENABLE_NODE") == 0) @@ -671,15 +663,11 @@ if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- ENABLE_NODE"); + rc = archive_void_log(node, seqbuf, "-- ENABLE_NODE"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "DROP_NODE") == 0) { int no_id = (int)strtol(event->ev_data1, NULL, 10); @@ -730,15 +718,11 @@ need_reloadListen = true; if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_NODE"); + rc = archive_void_log(node, seqbuf, "-- DROP_NODE"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "STORE_PATH") == 0) { int pa_server = (int)strtol(event->ev_data1, NULL, 10); @@ -757,15 +741,11 @@ need_reloadListen = true; if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_PATH"); + rc = archive_void_log(node, seqbuf, "-- STORE_PATH"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "DROP_PATH") == 0) { int pa_server = (int)strtol(event->ev_data1, NULL, 10); @@ -782,16 +762,11 @@ need_reloadListen = true; if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_PATH"); + rc = archive_void_log(node, seqbuf, "-- DROP_PATH"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); } } - } else if (strcmp(event->ev_type, "STORE_LISTEN") == 0) { int li_origin = (int)strtol(event->ev_data1, NULL, 10); @@ -807,15 +782,11 @@ li_origin, li_provider, li_receiver); if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_LISTEN"); + rc = archive_void_log(node, seqbuf, "-- STORE_LISTEN"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "DROP_LISTEN") == 0) { int li_origin = (int)strtol(event->ev_data1, NULL, 10); @@ -831,14 +802,10 @@ li_origin, li_provider, li_receiver); if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_LISTEN"); + rc = archive_void_log(node, seqbuf, "-- DROP_LISTEN"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } - } } else if (strcmp(event->ev_type, "STORE_SET") == 0) @@ -857,15 +824,11 @@ if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_SET"); + rc = archive_void_log(node, seqbuf, "-- STORE_SET"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "DROP_SET") == 0) { int set_id = (int)strtol(event->ev_data1, NULL, 10); @@ -881,39 +844,15 @@ */ if (archive_dir) { - rc = open_log_archive(rtcfg_nodeid, seqbuf); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } - rc = generate_archive_header(rtcfg_nodeid, seqbuf); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } slon_mkquery(&lsquery, "delete from %s.sl_setsync_offline " " where ssy_setid= %d;", rtcfg_namespace, set_id); - rc = submit_query_to_archive(&lsquery); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_open(node, seqbuf) < 0 || + archive_append_ds(node, &lsquery) < 0 || + archive_close(node) < 0) slon_retry(); } - rc = close_log_archive(); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } - } } else if (strcmp(event->ev_type, "MERGE_SET") == 0) { @@ -933,40 +872,16 @@ */ if (archive_dir) { - rc = open_log_archive(rtcfg_nodeid, seqbuf); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } - rc = generate_archive_header(rtcfg_nodeid, seqbuf); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } rc = slon_mkquery(&lsquery, "delete from %s.sl_setsync_offline " " where ssy_setid= %d;", rtcfg_namespace, add_id); - rc = submit_query_to_archive(&lsquery); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } - rc = close_log_archive(); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_open(node, seqbuf) < 0 || + archive_append_ds(node, &lsquery) < 0 || + archive_close(node) < 0) slon_retry(); } } - } else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0) { /* @@ -976,15 +891,11 @@ */ if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE"); + rc = archive_void_log(node, seqbuf, "-- SET_ADD_TABLE"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0) { /* @@ -994,15 +905,11 @@ */ if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE"); + rc = archive_void_log(node, seqbuf, "-- SET_ADD_SEQUENCE"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0) { int tab_id = (int)strtol(event->ev_data1, NULL, 10); @@ -1012,15 +919,11 @@ tab_id); if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE"); + rc = archive_void_log(node, seqbuf, "-- SET_DROP_TABLE"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0) { int seq_id = (int)strtol(event->ev_data1, NULL, 10); @@ -1030,15 +933,11 @@ seq_id); if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE"); + rc = archive_void_log(node, seqbuf, "-- SET_DROP_SEQUENCE"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0) { int tab_id = (int)strtol(event->ev_data1, NULL, 10); @@ -1049,15 +948,11 @@ tab_id, new_set_id); if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE"); + rc = archive_void_log(node, seqbuf, "-- SET_MOVE_TABLE"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0) { int seq_id = (int)strtol(event->ev_data1, NULL, 10); @@ -1068,15 +963,11 @@ seq_id, new_set_id); if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE"); + rc = archive_void_log(node, seqbuf, "-- SET_MOVE_SEQUENCE"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0) { int trig_tabid = (int)strtol(event->ev_data1, NULL, 10); @@ -1088,15 +979,11 @@ trig_tabid, trig_tgname); if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER"); + rc = archive_void_log(node, seqbuf, "-- STORE_TRIGGER"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0) { int trig_tabid = (int)strtol(event->ev_data1, NULL, 10); @@ -1108,15 +995,11 @@ trig_tabid, trig_tgname); if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER"); + rc = archive_void_log(node, seqbuf, "-- DROP_TRIGGER"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } } - } else if (strcmp(event->ev_type, "ACCEPT_SET") == 0) { int set_id, @@ -1289,14 +1172,10 @@ if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- FAILOVER_SET"); + rc = archive_void_log(node, seqbuf, "-- FAILOVER_SET"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } - } need_reloadListen = true; } else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0) @@ -1315,14 +1194,10 @@ sub_set, sub_provider, sub_receiver, sub_forward); if (archive_dir) { - rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET"); + rc = archive_void_log(node, seqbuf, "-- SUBSCRIBE_SET"); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_retry(); } - } need_reloadListen = true; } else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0) @@ -1453,39 +1328,15 @@ need_reloadListen = true; if (archive_dir) { - rc = open_log_archive(rtcfg_nodeid, seqbuf); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } - rc = generate_archive_header(rtcfg_nodeid, seqbuf); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } slon_mkquery(&lsquery, "delete from %s.sl_setsync_offline " " where ssy_setid= %d;", rtcfg_namespace, sub_set); - rc = submit_query_to_archive(&lsquery); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_open(node, seqbuf) < 0 || + archive_append_ds(node, &lsquery) < 0 || + archive_close(node) < 0) slon_retry(); } - rc = close_log_archive(); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } - } } else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0) { @@ -1572,50 +1423,19 @@ if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) { - rc = open_log_archive(node->no_id, seqbuf); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not open DDL archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_open(node, seqbuf) < 0) slon_retry(); - } - generate_archive_header(node->no_id, seqbuf); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not generate DDL archive header %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_tracking(node, rtcfg_namespace, + ddl_setid, seqbuf, seqbuf, + event->ev_timestamp_c) < 0) slon_retry(); - } - rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not generate DDL archive tracker %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_retry(); - } - rc = submit_string_to_archive(ddl_script); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not submit DDL Script %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_append_str(node, ddl_script) < 0) slon_retry(); - } - - rc = close_log_archive(); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not close DDL Script %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_close(node) < 0) slon_retry(); } } } - } else if (strcmp(event->ev_type, "RESET_CONFIG") == 0) { int reset_config_setid = (int)strtol(event->ev_data1, NULL, 10); @@ -1626,14 +1446,16 @@ rtcfg_namespace, reset_config_setid, reset_configonly_on_node); if (archive_dir) - write_void_log(rtcfg_nodeid, seqbuf, "-- RESET_CONFIG"); + if (archive_void_log(node, seqbuf, "-- RESET_CONFIG") < 0) + slon_retry(); } else { printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n", node->no_id, event->ev_origin, event->ev_seqno, event->ev_type); if (archive_dir) - write_void_log(rtcfg_nodeid, seqbuf, "-- UNHANDLED EVENT!!!"); + if (archive_void_log(node, seqbuf, "-- UNHANDLED EVENT!!!") < 0) + slon_retry(); } /* @@ -2681,34 +2503,16 @@ */ if (archive_dir) { - rc = open_log_archive(rtcfg_nodeid, seqbuf); + rc = archive_open(node, seqbuf); if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not open COPY SET archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query2); dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); - return -1; - } - rc = generate_archive_header(rtcfg_nodeid, seqbuf); - if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not generate COPY SET archive header %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_disconnectdb(pro_conn); - dstring_free(&query1); - dstring_free(&query2); - dstring_free(&query3); - dstring_free(&lsquery); - dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -2727,7 +2531,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -2761,7 +2565,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (*(PQgetvalue(res1, 0, 0)) == 't') @@ -2776,7 +2580,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } PQclear(res1); @@ -2794,7 +2598,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -2842,7 +2646,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res1); @@ -2880,7 +2684,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -2908,7 +2712,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -2938,7 +2742,7 @@ slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query3); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -2959,7 +2763,7 @@ slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query3); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3000,7 +2804,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res1); @@ -3027,7 +2831,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3066,7 +2870,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res1); @@ -3108,7 +2912,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -3136,7 +2940,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -3161,7 +2965,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: " @@ -3201,7 +3005,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -3226,7 +3030,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples2 = PQntuples(res2); @@ -3245,7 +3049,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3277,7 +3081,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -3299,7 +3103,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -3332,7 +3136,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) @@ -3340,20 +3144,16 @@ slon_mkquery(&query1, "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname, nodeon73 ? "" : PQgetvalue(res3, 0, 0)); - rc = submit_query_to_archive(&query1); + rc = archive_append_ds(node, &query1); if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_d: " - "Could not generate copy_set request for %s - %s", - node->no_id, tab_fqname, strerror(errno)); - slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query2); dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3386,7 +3186,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -3418,17 +3218,14 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) { - rc = fwrite(copydata, 1, len, archive_fp); - if (rc != len) + rc = archive_append_data(node, copydata, len); + if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "PQputCopyData() - log shipping - %s", - node->no_id, strerror(errno)); #ifdef SLON_MEMDEBUG memset(copydata, 88, len); #endif @@ -3443,7 +3240,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -3468,7 +3265,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -3493,7 +3290,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } PQclear(res3); @@ -3514,7 +3311,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } res2 = PQgetResult(loc_dbconn); @@ -3532,12 +3329,12 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) { - rc = submit_string_to_archive("\\."); + rc = archive_append_str(node, "\\."); if (rc < 0) { PQclear(res2); PQclear(res1); @@ -3547,7 +3344,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3574,7 +3371,7 @@ PQputline(loc_dbconn, copybuf); PQputline(loc_dbconn, "\n"); if (archive_dir) { - rc = submit_string_to_archive(copybuf); + rc = archive_append_str(node, copybuf); if (rc < 0) { PQclear(res2); PQclear(res1); @@ -3584,7 +3381,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3592,7 +3389,7 @@ case 1: PQputline(loc_dbconn, copybuf); if (archive_dir) { - rc = submit_raw_data_to_archive(copybuf); + rc = archive_append_data(node, copybuf, strlen(copybuf)); if (rc < 0) { PQclear(res2); PQclear(res1); @@ -3602,7 +3399,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -3615,7 +3412,7 @@ PQputline(loc_dbconn, "\\.\n"); if (archive_dir) { - rc = submit_string_to_archive("\\."); + rc = archive_append_str(node, "\\."); if (rc < 0) { PQclear(res2); PQclear(res1); @@ -3625,7 +3422,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3648,7 +3445,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } PQclear(res3); @@ -3674,7 +3471,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } #endif /* HAVE_PQPUTCOPYDATA */ @@ -3700,12 +3497,12 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) { - rc = submit_query_to_archive(&query1); + rc = archive_append_ds(node, &query1); if (rc < 0) { return -1; } @@ -3755,7 +3552,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res1); @@ -3782,7 +3579,7 @@ if (archive_dir) { - rc = submit_query_to_archive(&query1); + rc = archive_append_ds(node, &query1); if (rc < 0) { PQclear(res1); slon_disconnectdb(pro_conn); @@ -3791,7 +3588,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3814,7 +3611,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3862,7 +3659,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (PQntuples(res1) != 1) @@ -3877,7 +3674,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (PQgetisnull(res1, 0, 0)) @@ -3928,7 +3725,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (PQntuples(res1) != 1) @@ -3943,7 +3740,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ssy_seqno = PQgetvalue(res1, 0, 0); @@ -3990,7 +3787,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res2); @@ -4035,7 +3832,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (PQntuples(res1) != 1) @@ -4050,7 +3847,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } dstring_init(&ssy_action_list); @@ -4083,7 +3880,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) @@ -4092,7 +3889,7 @@ "insert into %s.sl_setsync_offline (ssy_setid, ssy_seqno) " "values ('%d', '%d');", rtcfg_namespace, set_id, ssy_seqno); - rc = submit_query_to_archive(&lsquery); + rc = archive_append_ds(node, &lsquery); if (rc < 0) { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " @@ -4104,7 +3901,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -4116,19 +3913,16 @@ if (archive_dir) { - rc = close_log_archive(); + rc = archive_close(node); if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - " could not close archive log %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query2); dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -4147,7 +3941,7 @@ dstring_free(&query3); dstring_free(&lsquery); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } slon_disconnectdb(pro_conn); @@ -4217,21 +4011,10 @@ */ if (archive_dir) { - rc = open_log_archive(node->no_id, seqbuf); - if (rc == -1) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot open archive file %s - %s\n", - node->no_id, archive_tmp, strerror(errno)); - dstring_free(&query); - return 60; - } - rc = generate_archive_header(node->no_id, seqbuf); + rc = archive_open(node, seqbuf); if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s\n", - node->no_id, archive_tmp, strerror(errno)); + dstring_free(&query); return 60; } } @@ -4257,7 +4040,8 @@ slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "No pa_conninfo for data provider %d\n", node->no_id, provider->no_id); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 10; } sprintf(conn_symname, "subscriber_%d_provider_%d", @@ -4270,7 +4054,8 @@ "cannot connect to data provider %d on '%s'\n", node->no_id, provider->no_id, provider->pa_conninfo); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return provider->pa_connretry; } @@ -4282,7 +4067,8 @@ rtcfg_namespace, rtcfg_nodeid); if (query_execute(node, provider->conn->dbconn, &query) < 0) { - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_disconnectdb(provider->conn); provider->conn = NULL; return provider->pa_connretry; @@ -4318,7 +4104,8 @@ "for ev_origin %d\n", node->no_id, provider->no_id, event->ev_origin); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 10; } if (prov_seqno < event->ev_seqno) @@ -4328,7 +4115,8 @@ "ev_seqno " INT64_FORMAT " for ev_origin %d\n", node->no_id, provider->no_id, prov_seqno, event->ev_origin); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 10; } slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " @@ -4395,7 +4183,8 @@ PQresultErrorMessage(res1)); PQclear(res1); dstring_free(&new_qual); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } @@ -4444,7 +4233,8 @@ PQclear(res2); PQclear(res1); dstring_free(&new_qual); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } ntuples2 = PQntuples(res2); @@ -4578,16 +4368,11 @@ slon_log(SLON_DEBUG2, "writing archive log...\n"); fflush(stderr); fflush(stdout); - rc = logarchive_tracking(rtcfg_namespace, sub_set, + rc = archive_tracking(node, rtcfg_namespace, sub_set, PQgetvalue(res1, tupno1, 1), seqbuf, event->ev_timestamp_c); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s\n", - node->no_id, archive_tmp, strerror(errno)); - return 60; - } + slon_retry(); } } PQclear(res1); @@ -4620,14 +4405,9 @@ dstring_free(&query); if (archive_dir) { - rc = close_log_archive(); + rc = archive_close(node); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not close out archive file %s - %s\n", - node->no_id, archive_tmp, strerror(errno)); - return 60; - } + slon_retry(); } return 0; } @@ -4644,7 +4424,8 @@ node->no_id, dstring_data(&query), PQresultErrorMessage(res1)); PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_disconnectdb(provider->conn); provider->conn = NULL; return 20; @@ -4655,7 +4436,8 @@ slon_log(SLON_ERROR, "remoteWorkerThread_%d: cannot determine current log status\n", node->no_id); PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_disconnectdb(provider->conn); provider->conn = NULL; return 20; @@ -4778,19 +4560,9 @@ */ if (archive_dir) { - rc = submit_string_to_archive(dstring_data(&(wgline->data))); - - /* - * rc = fprintf(archive_fp, "%s", - * dstring_data(&(wgline->data))); - */ + rc = archive_append_ds(node, &(wgline->data)); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s\n", - node->no_id, archive_tmp, strerror(errno)); - return 60; - } + slon_retry(); } break; @@ -4889,7 +4661,8 @@ */ if (num_errors != 0) { - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n", node->no_id); return 10; @@ -4926,7 +4699,8 @@ node->no_id, dstring_data(&query), PQresultErrorMessage(res1)); PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_disconnectdb(provider->conn); provider->conn = NULL; return 20; @@ -4944,7 +4718,8 @@ if (query_execute(node, local_dbconn, &query) < 0) { PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } @@ -4957,14 +4732,9 @@ "select %s.sequenceSetValue_offline(%s,'%s');\n", rtcfg_namespace, seql_seqid, seql_last_value); - rc = submit_query_to_archive(&lsquery); + rc = archive_append_ds(node, &lsquery); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s\n", - node->no_id, archive_tmp, strerror(errno)); - return 60; - } + slon_retry(); } } PQclear(res1); @@ -5006,7 +4776,8 @@ node->no_id, dstring_data(&query), PQresultErrorMessage(res1)); PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n", node->no_id); return 10; @@ -5032,7 +4803,8 @@ node->no_id, dstring_data(&query), PQresultErrorMessage(res1)); PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } if (PQntuples(res1) > 0) @@ -5046,7 +4818,8 @@ if (query_execute(node, local_dbconn, &query) < 0) { PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " @@ -5061,18 +4834,13 @@ */ if (archive_dir) { - rc = close_log_archive(); + rc = archive_close(node); if (rc < 0) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not close out archive file %s - %s\n", - node->no_id, archive_tmp, strerror(errno)); - return 60; + slon_retry(); - } if (command_on_logarchive) { char command[512]; - sprintf(command, "%s %s", command_on_logarchive, archive_name); + sprintf(command, "%s %s", command_on_logarchive, node->archive_name); slon_log(SLON_INFO, "remoteWorkerThread_%d: Run Archive Command %s\n", node->no_id, command); system(command); @@ -5760,7 +5528,7 @@ /* ---------- * Functions for processing log archives... * - * - First, you open the log archive using open_log_archive() + * - First, you open the log archive using archive_open() * * - Second, you generate the header using generate_archive_header() * @@ -5783,47 +5551,76 @@ /* ---------- - * open_log_archive + * archive_open * * Stores the archive name in archive_name (as .sql name) and * archive_tmp (.tmp file) * ---------- */ -int -open_log_archive(int node_id, char *seqbuf) +static int +archive_open(SlonNode *node, char *seqbuf) { int i; + int rc; - sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node_id); + if (node->archive_name == NULL) + { + node->archive_name = malloc(SLON_MAX_PATH); + node->archive_temp = malloc(SLON_MAX_PATH); + if (node->archive_name == NULL || node->archive_temp == NULL) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Out of memory in archive_open()\n", + node->no_id); + return -1; + } + } + + sprintf(node->archive_name, "%s/slony1_log_%d_", archive_dir, + node->no_id); for (i = strlen(seqbuf); i < 20; i++) - strcat(archive_name, "0"); - strcat(archive_name, seqbuf); - strcat(archive_name, ".sql"); - strcpy(archive_tmp, archive_name); - strcat(archive_tmp, ".tmp"); - archive_fp = fopen(archive_tmp, "w"); - if (archive_fp == NULL) + strcat(node->archive_name, "0"); + strcat(node->archive_name, seqbuf); + strcat(node->archive_name, ".sql"); + strcpy(node->archive_temp, node->archive_name); + strcat(node->archive_temp, ".tmp"); + node->archive_fp = fopen(node->archive_temp, "w"); + if (node->archive_fp == NULL) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot open archive file %s - %s\n", + node->no_id, node->archive_temp, strerror(errno)); return -1; } - else + + rc = fprintf(node->archive_fp, + "-- Slony-I log shipping archive\n" + "-- Node %d, Event %s\n" + "start transaction;\n", + node->no_id, seqbuf); + if (rc < 0) { - return 0; + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s\n", + node->no_id, node->archive_temp, strerror(errno)); + return -1; } + + return 0; } /* ---------- - * close_log_archive + * archive_close * ---------- */ -int -close_log_archive() +static int +archive_close(SlonNode *node) { int rc = 0; if (archive_dir) { - rc = fprintf(archive_fp, + rc = fprintf(node->archive_fp, "\n------------------------------------------------------------------\n" "-- End Of Archive Log\n" "------------------------------------------------------------------\n" @@ -5831,116 +5628,166 @@ "vacuum analyze %s.sl_setsync_offline;\n", rtcfg_namespace); if (rc < 0) + { + archive_terminate(node); + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s\n", + node->no_id, node->archive_temp, strerror(errno)); return -1; - rc = fclose(archive_fp); - archive_fp = NULL; + } + + rc = fclose(node->archive_fp); + node->archive_fp = NULL; if (rc != 0) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot close archive file %s - %s\n", + node->no_id, node->archive_temp, strerror(errno)); return -1; - rc = rename(archive_tmp, archive_name); } - return rc; + + rc = rename(node->archive_temp, node->archive_name); + if (rc != 0) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot rename archive file %s to %s - %s\n", + node->no_id, node->archive_temp, node->archive_name, + strerror(errno)); + return -1; + } + } + + return 0; } /* ---------- - * logarchive_tracking + * archive_terminate * ---------- */ -int -logarchive_tracking(const char *namespace, int sub_set, const char *firstseq, - const char *seqbuf, const char *timestamp) +static void +archive_terminate(SlonNode *node) +{ + if (node->archive_fp != NULL) { - return fprintf(archive_fp, + fclose(node->archive_fp); + node->archive_fp = NULL; + } +} + +/* ---------- + * archive_tracking + * ---------- + */ +static int +archive_tracking(SlonNode *node, const char *namespace, int sub_set, + const char *firstseq, const char *seqbuf, + const char *timestamp) +{ + int rc; + + rc = fprintf(node->archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n" "-- end of log archiving header\n" "------------------------------------------------------------------\n" "-- start of Slony-I data\n" "------------------------------------------------------------------\n", namespace, sub_set, firstseq, seqbuf, timestamp); + if (rc < 0) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s\n", + node->no_id, node->archive_temp, strerror(errno)); + return -1; } -/* ---------- - * submit_query_to_archive - * ---------- - */ -int -submit_query_to_archive(SlonDString * ds) -{ - return fprintf(archive_fp, "%s\n", ds->data); + return 0; } /* ---------- - * submit_string_to_archive + * archive_append_ds * ---------- */ -int -submit_string_to_archive(const char *s) +static int +archive_append_ds(SlonNode *node, SlonDString * ds) { - return fprintf(archive_fp, "%s\n", s); -} + int rc; -#ifndef HAVE_PQPUTCOPYDATA -/* ---------- - * submit_raw_data_to_archive - * - * Raw form used for COPY where we don't want any extra cr/lf output - * ---------- - */ -int -submit_raw_data_to_archive(const char *s) + rc = fprintf(node->archive_fp, "%s\n", dstring_data(ds)); + if (rc < 0) { - return fprintf(archive_fp, "%s", s); + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s\n", + node->no_id, node->archive_temp, strerror(errno)); + return -1; + } + + return 0; } -#endif /* ---------- - * terminate_log_archive + * archive_append_str * ---------- */ -void -terminate_log_archive() +static int +archive_append_str(SlonNode *node, const char *s) { - if (archive_fp) + int rc; + + rc = fprintf(node->archive_fp, "%s\n", s); + if (rc < 0) { - fclose(archive_fp); + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s\n", + node->no_id, node->archive_temp, strerror(errno)); + return -1; } + + return 0; } /* ---------- - * generate_archive_header + * archive_append_data + * + * Raw form used for COPY where we don't want any extra cr/lf output * ---------- */ -int -generate_archive_header(int node_id, const char *seqbuf) +static int +archive_append_data(SlonNode *node, const char *s, int len) { - return fprintf(archive_fp, - "-- Slony-I log shipping archive\n" - "-- Node %d, Event %s\n" - "start transaction;\n", - node_id, seqbuf); + int rc; + + rc = fwrite(s, len, 1, node->archive_fp); + if (rc != 1) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s\n", + node->no_id, node->archive_temp, strerror(errno)); + return -1; + } + + return 0; } /* ---------- - * write_void_log + * archive_void_log * * writes out a "void" log consisting of the message which must either * be a valid SQL query or a SQL comment. * ---------- */ -int -write_void_log(int node_id, char *seqbuf, const char *message) +static int +archive_void_log(SlonNode *node, char *seqbuf, const char *message) { int rc; - rc = open_log_archive(node_id, seqbuf); - if (rc < 0) - return rc; - rc = generate_archive_header(node_id, seqbuf); + rc = archive_open(node, seqbuf); if (rc < 0) return rc; - rc = submit_string_to_archive(message); + rc = archive_append_str(node, message); if (rc < 0) return rc; - rc = close_log_archive(); + rc = archive_close(node); + return rc; } Index: slon.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v retrieving revision 1.59 retrieving revision 1.60 diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.59 -r1.60 --- src/slon/slon.h +++ src/slon/slon.h @@ -106,6 +106,10 @@ SlonWorkMsg *message_head; SlonWorkMsg *message_tail; + char *archive_name; + char *archive_temp; + FILE *archive_fp; + SlonNode *prev; SlonNode *next; };
- Previous message: [Slony1-commit] By wieck: Fixed archive log writing by moving global variables into the
- Next message: [Slony1-commit] By cbbrowne: Replace a CONTINUE statement that doesn't work in PG 7.4
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list