Thu May 31 06:29:29 PDT 2007
- Previous message: [Slony1-commit] slony1-engine/src/ducttape Makefile test_8_logship.in
- Next message: [Slony1-commit] slony1-engine/src/ducttape Makefile test_8_logship.in
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv1075/src/slon
Modified Files:
remote_worker.c
Log Message:
Fix archive log ship tracking. Slon now tracks the setsync status in memory
and generates a void archive with the correct old,new event seqno for all
events.
Jan
Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.138
retrieving revision 1.139
diff -C2 -d -r1.138 -r1.139
*** remote_worker.c 20 Apr 2007 20:53:18 -0000 1.138
--- remote_worker.c 31 May 2007 13:29:27 -0000 1.139
***************
*** 111,114 ****
--- 111,115 ----
int set_id;
int sub_forward;
+ char ssy_seqno[64];
ProviderSet *prev;
***************
*** 261,265 ****
int sub_set, const char *firstseq,
const char *seqbuf, const char *timestamp);
- static int archive_void_log(SlonNode *node, char *seqbuf, const char *message);
--- 262,265 ----
***************
*** 291,295 ****
bool event_ok;
bool need_reloadListen = false;
- int rc;
slon_log(SLON_DEBUG1,
--- 291,294 ----
***************
*** 372,375 ****
--- 371,421 ----
adjust_provider_info(node, wd, false);
curr_config = rtcfg_seq_get();
+
+ /*
+ * If we do archive logging, we need the ssy_seqno of
+ * all sets this worker is replicating.
+ */
+ if (archive_dir)
+ {
+ ProviderInfo *pinfo;
+ ProviderSet *pset;
+ PGresult *res;
+
+ for (pinfo=wd->provider_head; pinfo != NULL; pinfo = pinfo->next)
+ {
+ for(pset = pinfo->set_head; pset != NULL; pset = pset->next)
+ {
+ slon_mkquery(&query1,
+ "select max(ssy_seqno) from %s.sl_setsync "
+ " where ssy_setid = %d "
+ " and ssy_origin = %d; ",
+ rtcfg_namespace, pset->set_id, node->no_id);
+ if (query_execute(node, local_dbconn, &query1) < 0)
+ slon_retry();
+
+ res = PQexec(local_dbconn, dstring_data(&query1));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ slon_log(SLON_FATAL, "remoteWorkerThread_%d: \"%s\" %s",
+ node->no_id, dstring_data(&query1),
+ PQresultErrorMessage(res));
+ PQclear(res);
+ slon_retry();
+ }
+ if (PQntuples(res) != 1)
+ {
+ slon_log(SLON_FATAL, "remoteWorkerThread_%d: Query \"%s\" did not return one row\n",
+ node->no_id, dstring_data(&query1));
+ PQclear(res);
+ slon_retry();
+ }
+ strcpy(pset->ssy_seqno, PQgetvalue(res, 0, 0));
+ PQclear(res);
+
+ slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: set %d starts at ssy_seqno %s\n",
+ node->no_id, pset->set_id, pset->ssy_seqno);
+ }
+ }
+ }
}
rtcfg_unlock();
***************
*** 622,625 ****
--- 668,705 ----
/*
+ * For all non-SYNC events, we write at least a standard
+ * event tracking log file and adjust the ssy_seqno in our
+ * internal tracking.
+ */
+ if (archive_dir)
+ {
+ ProviderInfo *pinfo;
+ ProviderSet *pset;
+ char buf[256];
+
+ if (archive_open(node, seqbuf) < 0)
+ slon_retry();
+ sprintf(buf, "-- %s", event->ev_type);
+ if (archive_append_str(node, buf) < 0)
+ slon_retry();
+
+ for (pinfo=wd->provider_head; pinfo != NULL; pinfo = pinfo->next)
+ {
+ for(pset = pinfo->set_head; pset != NULL; pset = pset->next)
+ {
+ if (archive_tracking(node, rtcfg_namespace,
+ pset->set_id, pset->ssy_seqno, seqbuf,
+ event->ev_timestamp_c) < 0)
+ slon_retry();
+ strcpy(pset->ssy_seqno, seqbuf);
+ }
+ }
+
+ /*
+ * Leave the archive open for event specific actions.
+ */
+ }
+
+ /*
* Simple configuration events. Call the corresponding runtime
* config function, add the query to call the configuration event
***************
*** 641,651 ****
need_reloadListen = true;
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- STORE_NODE");
- if (rc < 0)
- slon_retry();
- }
-
}
else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
--- 721,724 ----
***************
*** 662,672 ****
need_reloadListen = true;
-
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- ENABLE_NODE");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "DROP_NODE") == 0)
--- 735,738 ----
***************
*** 718,727 ****
need_reloadListen = true;
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- DROP_NODE");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "STORE_PATH") == 0)
--- 784,787 ----
***************
*** 741,750 ****
need_reloadListen = true;
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- STORE_PATH");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "DROP_PATH") == 0)
--- 801,804 ----
***************
*** 762,771 ****
need_reloadListen = true;
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- DROP_PATH");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
--- 816,819 ----
***************
*** 782,791 ****
rtcfg_namespace,
li_origin, li_provider, li_receiver);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- STORE_LISTEN");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
--- 830,833 ----
***************
*** 802,812 ****
rtcfg_namespace,
li_origin, li_provider, li_receiver);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- DROP_LISTEN");
- if (rc < 0)
- slon_retry();
- }
-
}
else if (strcmp(event->ev_type, "STORE_SET") == 0)
--- 844,847 ----
***************
*** 823,833 ****
rtcfg_namespace,
set_id, set_origin, set_comment);
-
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- STORE_SET");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "DROP_SET") == 0)
--- 858,861 ----
***************
*** 846,856 ****
if (archive_dir)
{
! (void) slon_mkquery(&lsquery,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, set_id);
! if (archive_open(node, seqbuf) < 0 ||
! archive_append_ds(node, &lsquery) < 0 ||
! archive_close(node) < 0)
slon_retry();
}
--- 874,882 ----
if (archive_dir)
{
! slon_mkquery(&lsquery,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, set_id);
! if (archive_append_ds(node, &lsquery) < 0)
slon_retry();
}
***************
*** 874,884 ****
if (archive_dir)
{
! rc = slon_mkquery(&lsquery,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, add_id);
! if (archive_open(node, seqbuf) < 0 ||
! archive_append_ds(node, &lsquery) < 0 ||
! archive_close(node) < 0)
slon_retry();
}
--- 900,908 ----
if (archive_dir)
{
! slon_mkquery(&lsquery,
"delete from %s.sl_setsync_offline "
" where ssy_setid= %d;",
rtcfg_namespace, add_id);
! if (archive_append_ds(node, &lsquery) < 0)
slon_retry();
}
***************
*** 891,900 ****
* in the runtime configuration.
*/
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- SET_ADD_TABLE");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
--- 915,918 ----
***************
*** 905,914 ****
* maintained in the runtime configuration.
*/
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- SET_ADD_SEQUENCE");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0)
--- 923,926 ----
***************
*** 919,928 ****
rtcfg_namespace,
tab_id);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- SET_DROP_TABLE");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0)
--- 931,934 ----
***************
*** 933,942 ****
rtcfg_namespace,
seq_id);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- SET_DROP_SEQUENCE");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0)
--- 939,942 ----
***************
*** 948,957 ****
rtcfg_namespace,
tab_id, new_set_id);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- SET_MOVE_TABLE");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0)
--- 948,951 ----
***************
*** 963,972 ****
rtcfg_namespace,
seq_id, new_set_id);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- SET_MOVE_SEQUENCE");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
--- 957,960 ----
***************
*** 979,988 ****
rtcfg_namespace,
trig_tabid, trig_tgname);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- STORE_TRIGGER");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
--- 967,970 ----
***************
*** 995,1004 ****
rtcfg_namespace,
trig_tabid, trig_tgname);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- DROP_TRIGGER");
- if (rc < 0)
- slon_retry();
- }
}
else if (strcmp(event->ev_type, "ACCEPT_SET") == 0)
--- 977,980 ----
***************
*** 1098,1101 ****
--- 1074,1078 ----
query_execute(node, local_dbconn, &query1);
slon_log(SLON_DEBUG2, "ACCEPT_SET - done\n");
+ archive_close(node);
slon_retry();
***************
*** 1172,1181 ****
failed_node, backup_node, set_id, seqbuf);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- FAILOVER_SET");
- if (rc < 0)
- slon_retry();
- }
need_reloadListen = true;
}
--- 1149,1152 ----
***************
*** 1194,1203 ****
rtcfg_namespace,
sub_set, sub_provider, sub_receiver, sub_forward);
- if (archive_dir)
- {
- rc = archive_void_log(node, seqbuf, "-- SUBSCRIBE_SET");
- if (rc < 0)
- slon_retry();
- }
need_reloadListen = true;
}
--- 1165,1168 ----
***************
*** 1334,1340 ****
" where ssy_setid= %d;",
rtcfg_namespace, sub_set);
! if (archive_open(node, seqbuf) < 0 ||
! archive_append_ds(node, &lsquery) < 0 ||
! archive_close(node) < 0)
slon_retry();
}
--- 1299,1303 ----
" where ssy_setid= %d;",
rtcfg_namespace, sub_set);
! if (archive_append_ds(node, &lsquery) < 0)
slon_retry();
}
***************
*** 1425,1438 ****
{
- if (archive_open(node, seqbuf) < 0)
- slon_retry();
- if (archive_tracking(node, rtcfg_namespace,
- ddl_setid, seqbuf, seqbuf,
- event->ev_timestamp_c) < 0)
- slon_retry();
if (archive_append_str(node, ddl_script) < 0)
slon_retry();
- if (archive_close(node) < 0)
- slon_retry();
}
}
--- 1388,1393 ----
***************
*** 1447,1453 ****
rtcfg_namespace,
reset_config_setid, reset_configonly_on_node);
- if (archive_dir)
- if (archive_void_log(node, seqbuf, "-- RESET_CONFIG") < 0)
- slon_retry();
}
else
--- 1402,1405 ----
***************
*** 1455,1461 ****
printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n",
node->no_id, event->ev_origin, event->ev_seqno, event->ev_type);
- if (archive_dir)
- if (archive_void_log(node, seqbuf, "-- UNHANDLED EVENT!!!") < 0)
- slon_retry();
}
--- 1407,1410 ----
***************
*** 1468,1475 ****
--- 1417,1427 ----
query_append_event(&query1, event);
slon_appendquery(&query1, "commit transaction;");
+ if (archive_close(node) < 0)
+ slon_retry();
}
else
{
(void) slon_mkquery(&query1, "rollback transaction;");
+ archive_terminate(node);
}
if (query_execute(node, local_dbconn, &query1) < 0)
***************
*** 2496,2522 ****
sprintf(seqbuf, INT64_FORMAT, event->ev_seqno);
-
- /* Log Shipping Support begins... */
-
- /*
- * - Open the log, put the header in Isn't it convenient that seqbuf was
- * just populated??? :-)
- */
- if (archive_dir)
- {
- rc = archive_open(node, seqbuf);
- if (rc < 0)
- {
- slon_disconnectdb(pro_conn);
- dstring_free(&query1);
- dstring_free(&query2);
- dstring_free(&query3);
- dstring_free(&lsquery);
- dstring_free(&indexregenquery);
- archive_terminate(node);
- return -1;
- }
- }
-
/*
* Register this connection in sl_nodelock
--- 2448,2451 ----
***************
*** 2982,2986 ****
{
(void) slon_mkquery(&query1,
! "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname,
nodeon73 ? "" : PQgetvalue(res3, 0, 0));
rc = archive_append_ds(node, &query1);
--- 2911,2915 ----
{
(void) slon_mkquery(&query1,
! "delete from %s;\ncopy %s %s from stdin;", tab_fqname, tab_fqname,
nodeon73 ? "" : PQgetvalue(res3, 0, 0));
rc = archive_append_ds(node, &query1);
***************
*** 3712,3719 ****
set_id, node->no_id, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip,
dstring_data(&ssy_action_list));
- PQclear(res1);
dstring_free(&ssy_action_list);
if (query_execute(node, loc_dbconn, &query1) < 0)
{
slon_disconnectdb(pro_conn);
dstring_free(&query1);
--- 3641,3648 ----
set_id, node->no_id, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip,
dstring_data(&ssy_action_list));
dstring_free(&ssy_action_list);
if (query_execute(node, loc_dbconn, &query1) < 0)
{
+ PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
***************
*** 3737,3740 ****
--- 3666,3670 ----
" could not insert to sl_setsync_offline",
node->no_id);
+ PQclear(res1);
slon_disconnectdb(pro_conn);
dstring_free(&query1);
***************
*** 3747,3750 ****
--- 3677,3681 ----
}
}
+ PQclear(res1);
gettimeofday(&tv_now, NULL);
slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
***************
*** 3753,3773 ****
TIMEVAL_DIFF(&tv_start2, &tv_now));
- if (archive_dir)
- {
- rc = archive_close(node);
- if (rc < 0)
- {
- slon_disconnectdb(pro_conn);
- dstring_free(&query1);
- dstring_free(&query2);
- dstring_free(&query3);
- dstring_free(&lsquery);
- dstring_free(&indexregenquery);
- archive_terminate(node);
- return -1;
- }
- }
-
-
/*
* Roll back the transaction we used on the provider and close the
--- 3684,3687 ----
***************
*** 5379,5382 ****
--- 5293,5299 ----
int rc;
+ if (!archive_dir)
+ return 0;
+
if (node->archive_name == NULL)
{
***************
*** 5392,5395 ****
--- 5309,5320 ----
}
+ if (node->archive_fp != NULL)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "archive_open() called - archive is already opened\n",
+ node->no_id);
+ return -1;
+ }
+
sprintf(node->archive_name, "%s/slony1_log_%d_", archive_dir,
node->no_id);
***************
*** 5434,5474 ****
int rc = 0;
! if (archive_dir)
{
! rc = fprintf(node->archive_fp,
! "\n------------------------------------------------------------------\n"
! "-- End Of Archive Log\n"
! "------------------------------------------------------------------\n"
! "commit;\n"
! "vacuum analyze %s.sl_setsync_offline;\n",
! rtcfg_namespace);
! if (rc < 0)
! {
! archive_terminate(node);
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
! "Cannot write to archive file %s - %s\n",
! node->no_id, node->archive_temp, strerror(errno));
! return -1;
! }
! rc = fclose(node->archive_fp);
! node->archive_fp = NULL;
! if (rc != 0)
! {
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
! "Cannot close archive file %s - %s\n",
! node->no_id, node->archive_temp, strerror(errno));
! return -1;
! }
! rc = rename(node->archive_temp, node->archive_name);
! if (rc != 0)
! {
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
! "Cannot rename archive file %s to %s - %s\n",
! node->no_id, node->archive_temp, node->archive_name,
! strerror(errno));
! return -1;
! }
}
--- 5359,5407 ----
int rc = 0;
! if (!archive_dir)
! return 0;
!
! if (node->archive_fp == NULL)
{
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
! "Cannot close archive file %s - not open\n",
! node->no_id, node->archive_temp);
! return -1;
! }
! rc = fprintf(node->archive_fp,
! "\n------------------------------------------------------------------\n"
! "-- End Of Archive Log\n"
! "------------------------------------------------------------------\n"
! "commit;\n"
! "vacuum analyze %s.sl_setsync_offline;\n",
! rtcfg_namespace);
! if (rc < 0)
! {
! archive_terminate(node);
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
! "Cannot write to archive file %s - %s\n",
! node->no_id, node->archive_temp, strerror(errno));
! return -1;
! }
! rc = fclose(node->archive_fp);
! node->archive_fp = NULL;
! if (rc != 0)
! {
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
! "Cannot close archive file %s - %s\n",
! node->no_id, node->archive_temp, strerror(errno));
! return -1;
! }
!
! rc = rename(node->archive_temp, node->archive_name);
! if (rc != 0)
! {
! slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
! "Cannot rename archive file %s to %s - %s\n",
! node->no_id, node->archive_temp, node->archive_name,
! strerror(errno));
! return -1;
}
***************
*** 5501,5504 ****
--- 5434,5448 ----
int rc;
+ if (!archive_dir)
+ return 0;
+
+ if (node->archive_fp == NULL)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - not open\n",
+ node->no_id, node->archive_temp);
+ return -1;
+ }
+
rc = fprintf(node->archive_fp,
"\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
***************
*** 5528,5531 ****
--- 5472,5486 ----
int rc;
+ if (!archive_dir)
+ return 0;
+
+ if (node->archive_fp == NULL)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - not open\n",
+ node->no_id, node->archive_temp);
+ return -1;
+ }
+
rc = fprintf(node->archive_fp, "%s\n", dstring_data(ds));
if (rc < 0)
***************
*** 5549,5552 ****
--- 5504,5518 ----
int rc;
+ if (!archive_dir)
+ return 0;
+
+ if (node->archive_fp == NULL)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - not open\n",
+ node->no_id, node->archive_temp);
+ return -1;
+ }
+
rc = fprintf(node->archive_fp, "%s\n", s);
if (rc < 0)
***************
*** 5572,5575 ****
--- 5538,5552 ----
int rc;
+ if (!archive_dir)
+ return 0;
+
+ if (node->archive_fp == NULL)
+ {
+ slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+ "Cannot write to archive file %s - not open\n",
+ node->no_id, node->archive_temp);
+ return -1;
+ }
+
rc = fwrite(s, len, 1, node->archive_fp);
if (rc != 1)
***************
*** 5585,5611 ****
/* ----------
- * archive_void_log
- *
- * writes out a "void" log consisting of the message which must either
- * be a valid SQL query or a SQL comment.
- * ----------
- */
- static int
- archive_void_log(SlonNode *node, char *seqbuf, const char *message)
- {
- int rc;
-
- rc = archive_open(node, seqbuf);
- if (rc < 0)
- return rc;
- rc = archive_append_str(node, message);
- if (rc < 0)
- return rc;
- rc = archive_close(node);
-
- return rc;
- }
-
- /* ----------
* given a string consisting of a list of actionseq values, return a
* string that compresses this into a set of log_actionseq ranges
--- 5562,5565 ----
- Previous message: [Slony1-commit] slony1-engine/src/ducttape Makefile test_8_logship.in
- Next message: [Slony1-commit] slony1-engine/src/ducttape Makefile test_8_logship.in
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list