Thu May 31 06:29:50 PDT 2007
- Previous message: [Slony1-commit] slony1-engine/src/ducttape Makefile test_8_logship.in
- Next message: [Slony1-commit] slony1-engine configure
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Update of /home/cvsd/slony1/slony1-engine/src/slon In directory main.slony.info:/tmp/cvs-serv1063/src/slon Modified Files: Tag: REL_1_2_STABLE remote_worker.c Log Message: Fix archive log ship tracking. Slon now tracks the setsync status in memory and generates a void archive with the correct old,new event seqno for all events. Jan Index: remote_worker.c =================================================================== RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.124.2.13 retrieving revision 1.124.2.14 diff -C2 -d -r1.124.2.13 -r1.124.2.14 *** remote_worker.c 3 Apr 2007 21:55:03 -0000 1.124.2.13 --- remote_worker.c 31 May 2007 13:29:48 -0000 1.124.2.14 *************** *** 114,117 **** --- 114,118 ---- int set_id; int sub_forward; + char ssy_seqno[64]; ProviderSet *prev; *************** *** 264,268 **** int sub_set, const char *firstseq, const char *seqbuf, const char *timestamp); - static int archive_void_log(SlonNode *node, char *seqbuf, const char *message); --- 265,268 ---- *************** *** 297,301 **** int event_ok; int need_reloadListen = false; - int rc; slon_log(SLON_DEBUG1, --- 297,300 ---- *************** *** 373,376 **** --- 372,422 ---- adjust_provider_info(node, wd, false); curr_config = rtcfg_seq_get(); + + /* + * If we do archive logging, we need the ssy_seqno of + * all sets this worker is replicating. + */ + if (archive_dir) + { + ProviderInfo *pinfo; + ProviderSet *pset; + PGresult *res; + + for (pinfo=wd->provider_head; pinfo != NULL; pinfo = pinfo->next) + { + for(pset = pinfo->set_head; pset != NULL; pset = pset->next) + { + slon_mkquery(&query1, + "select max(ssy_seqno) from %s.sl_setsync " + " where ssy_setid = %d " + " and ssy_origin = %d; ", + rtcfg_namespace, pset->set_id, node->no_id); + if (query_execute(node, local_dbconn, &query1) < 0) + slon_retry(); + + res = PQexec(local_dbconn, dstring_data(&query1)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + slon_log(SLON_FATAL, "remoteWorkerThread_%d: \"%s\" %s", + node->no_id, dstring_data(&query1), + PQresultErrorMessage(res)); + PQclear(res); + slon_retry(); + } + if (PQntuples(res) != 1) + { + slon_log(SLON_FATAL, "remoteWorkerThread_%d: Query \"%s\" did not return one row\n", + node->no_id, dstring_data(&query1)); + PQclear(res); + slon_retry(); + } + strcpy(pset->ssy_seqno, PQgetvalue(res, 0, 0)); + PQclear(res); + + slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: set %d starts at ssy_seqno %s\n", + node->no_id, pset->set_id, pset->ssy_seqno); + } + } + } } rtcfg_unlock(); *************** *** 623,626 **** --- 669,706 ---- /* + * For all non-SYNC events, we write at least a standard + * event tracking log file and adjust the ssy_seqno in our + * internal tracking. + */ + if (archive_dir) + { + ProviderInfo *pinfo; + ProviderSet *pset; + char buf[256]; + + if (archive_open(node, seqbuf) < 0) + slon_retry(); + sprintf(buf, "-- %s", event->ev_type); + if (archive_append_str(node, buf) < 0) + slon_retry(); + + for (pinfo=wd->provider_head; pinfo != NULL; pinfo = pinfo->next) + { + for(pset = pinfo->set_head; pset != NULL; pset = pset->next) + { + if (archive_tracking(node, rtcfg_namespace, + pset->set_id, pset->ssy_seqno, seqbuf, + event->ev_timestamp_c) < 0) + slon_retry(); + strcpy(pset->ssy_seqno, seqbuf); + } + } + + /* + * Leave the archive open for event specific actions. + */ + } + + /* * Simple configuration events. Call the corresponding runtime * config function, add the query to call the configuration event *************** *** 642,652 **** need_reloadListen = true; - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- STORE_NODE"); - if (rc < 0) - slon_retry(); - } - } else if (strcmp(event->ev_type, "ENABLE_NODE") == 0) --- 722,725 ---- *************** *** 663,673 **** need_reloadListen = true; - - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- ENABLE_NODE"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "DROP_NODE") == 0) --- 736,739 ---- *************** *** 719,728 **** need_reloadListen = true; - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- DROP_NODE"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "STORE_PATH") == 0) --- 785,788 ---- *************** *** 742,751 **** need_reloadListen = true; - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- STORE_PATH"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "DROP_PATH") == 0) --- 802,805 ---- *************** *** 763,772 **** need_reloadListen = true; - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- DROP_PATH"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "STORE_LISTEN") == 0) --- 817,820 ---- *************** *** 783,792 **** rtcfg_namespace, li_origin, li_provider, li_receiver); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- STORE_LISTEN"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "DROP_LISTEN") == 0) --- 831,834 ---- *************** *** 803,813 **** rtcfg_namespace, li_origin, li_provider, li_receiver); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- DROP_LISTEN"); - if (rc < 0) - slon_retry(); - } - } else if (strcmp(event->ev_type, "STORE_SET") == 0) --- 845,848 ---- *************** *** 824,834 **** rtcfg_namespace, set_id, set_origin, set_comment); - - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- STORE_SET"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "DROP_SET") == 0) --- 859,862 ---- *************** *** 851,857 **** " where ssy_setid= %d;", rtcfg_namespace, set_id); ! if (archive_open(node, seqbuf) < 0 || ! archive_append_ds(node, &lsquery) < 0 || ! archive_close(node) < 0) slon_retry(); } --- 879,883 ---- " where ssy_setid= %d;", rtcfg_namespace, set_id); ! if (archive_append_ds(node, &lsquery) < 0) slon_retry(); } *************** *** 875,885 **** if (archive_dir) { ! rc = slon_mkquery(&lsquery, "delete from %s.sl_setsync_offline " " where ssy_setid= %d;", rtcfg_namespace, add_id); ! if (archive_open(node, seqbuf) < 0 || ! archive_append_ds(node, &lsquery) < 0 || ! archive_close(node) < 0) slon_retry(); } --- 901,909 ---- if (archive_dir) { ! slon_mkquery(&lsquery, "delete from %s.sl_setsync_offline " " where ssy_setid= %d;", rtcfg_namespace, add_id); ! if (archive_append_ds(node, &lsquery) < 0) slon_retry(); } *************** *** 892,901 **** * in the runtime configuration. */ - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- SET_ADD_TABLE"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0) --- 916,919 ---- *************** *** 906,915 **** * maintained in the runtime configuration. */ - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- SET_ADD_SEQUENCE"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0) --- 924,927 ---- *************** *** 920,929 **** rtcfg_namespace, tab_id); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- SET_DROP_TABLE"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0) --- 932,935 ---- *************** *** 934,943 **** rtcfg_namespace, seq_id); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- SET_DROP_SEQUENCE"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0) --- 940,943 ---- *************** *** 949,958 **** rtcfg_namespace, tab_id, new_set_id); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- SET_MOVE_TABLE"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0) --- 949,952 ---- *************** *** 964,973 **** rtcfg_namespace, seq_id, new_set_id); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- SET_MOVE_SEQUENCE"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0) --- 958,961 ---- *************** *** 980,989 **** rtcfg_namespace, trig_tabid, trig_tgname); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- STORE_TRIGGER"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0) --- 968,971 ---- *************** *** 996,1005 **** rtcfg_namespace, trig_tabid, trig_tgname); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- DROP_TRIGGER"); - if (rc < 0) - slon_retry(); - } } else if (strcmp(event->ev_type, "ACCEPT_SET") == 0) --- 978,981 ---- *************** *** 1099,1102 **** --- 1075,1079 ---- query_execute(node, local_dbconn, &query1); slon_log(SLON_DEBUG2, "ACCEPT_SET - done\n"); + archive_close(node); slon_retry(); *************** *** 1173,1182 **** failed_node, backup_node, set_id, seqbuf); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- FAILOVER_SET"); - if (rc < 0) - slon_retry(); - } need_reloadListen = true; } --- 1150,1153 ---- *************** *** 1195,1204 **** rtcfg_namespace, sub_set, sub_provider, sub_receiver, sub_forward); - if (archive_dir) - { - rc = archive_void_log(node, seqbuf, "-- SUBSCRIBE_SET"); - if (rc < 0) - slon_retry(); - } need_reloadListen = true; } --- 1166,1169 ---- *************** *** 1335,1341 **** " where ssy_setid= %d;", rtcfg_namespace, sub_set); ! if (archive_open(node, seqbuf) < 0 || ! archive_append_ds(node, &lsquery) < 0 || ! archive_close(node) < 0) slon_retry(); } --- 1300,1304 ---- " where ssy_setid= %d;", rtcfg_namespace, sub_set); ! if (archive_append_ds(node, &lsquery) < 0) slon_retry(); } *************** *** 1354,1360 **** rtcfg_namespace, reset_config_setid, reset_configonly_on_node); - if (archive_dir) - if (archive_void_log(node, seqbuf, "-- RESET_CONFIG") < 0) - slon_retry(); } else --- 1317,1320 ---- *************** *** 1362,1368 **** printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n", node->no_id, event->ev_origin, event->ev_seqno, event->ev_type); - if (archive_dir) - if (archive_void_log(node, seqbuf, "-- UNHANDLED EVENT!!!") < 0) - slon_retry(); } --- 1322,1325 ---- *************** *** 1375,1382 **** --- 1332,1342 ---- query_append_event(&query1, event); slon_appendquery(&query1, "commit transaction;"); + if (archive_close(node) < 0) + slon_retry(); } else { slon_mkquery(&query1, "rollback transaction;"); + archive_terminate(node); } if (query_execute(node, local_dbconn, &query1) < 0) *************** *** 2403,2429 **** sprintf(seqbuf, INT64_FORMAT, event->ev_seqno); - - /* Log Shipping Support begins... */ - - /* - * - Open the log, put the header in Isn't it convenient that seqbuf was - * just populated??? :-) - */ - if (archive_dir) - { - rc = archive_open(node, seqbuf); - if (rc < 0) - { - slon_disconnectdb(pro_conn); - dstring_free(&query1); - dstring_free(&query2); - dstring_free(&query3); - dstring_free(&lsquery); - dstring_free(&indexregenquery); - archive_terminate(node); - return -1; - } - } - /* * Register this connection in sl_nodelock --- 2363,2366 ---- *************** *** 3053,3057 **** { slon_mkquery(&query1, ! "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname, nodeon73 ? "" : PQgetvalue(res3, 0, 0)); rc = archive_append_ds(node, &query1); --- 2990,2994 ---- { slon_mkquery(&query1, ! "delete from %s;\ncopy %s %s from stdin;", tab_fqname, tab_fqname, nodeon73 ? "" : PQgetvalue(res3, 0, 0)); rc = archive_append_ds(node, &query1); *************** *** 3783,3790 **** set_id, node->no_id, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, dstring_data(&ssy_action_list)); - PQclear(res1); dstring_free(&ssy_action_list); if (query_execute(node, loc_dbconn, &query1) < 0) { slon_disconnectdb(pro_conn); dstring_free(&query1); --- 3720,3727 ---- set_id, node->no_id, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, dstring_data(&ssy_action_list)); dstring_free(&ssy_action_list); if (query_execute(node, loc_dbconn, &query1) < 0) { + PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); *************** *** 3808,3811 **** --- 3745,3749 ---- " could not insert to sl_setsync_offline", node->no_id); + PQclear(res1); slon_disconnectdb(pro_conn); dstring_free(&query1); *************** *** 3818,3821 **** --- 3756,3760 ---- } } + PQclear(res1); gettimeofday(&tv_now, NULL); slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " *************** *** 3824,3844 **** TIMEVAL_DIFF(&tv_start2, &tv_now)); - if (archive_dir) - { - rc = archive_close(node); - if (rc < 0) - { - slon_disconnectdb(pro_conn); - dstring_free(&query1); - dstring_free(&query2); - dstring_free(&query3); - dstring_free(&lsquery); - dstring_free(&indexregenquery); - archive_terminate(node); - return -1; - } - } - - /* * Roll back the transaction we used on the provider and close the --- 3763,3766 ---- *************** *** 5495,5498 **** --- 5417,5423 ---- int rc; + if (!archive_dir) + return 0; + if (node->archive_name == NULL) { *************** *** 5508,5511 **** --- 5433,5444 ---- } + if (node->archive_fp != NULL) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "archive_open() called - archive is already opened\n", + node->no_id); + return -1; + } + sprintf(node->archive_name, "%s/slony1_log_%d_", archive_dir, node->no_id); *************** *** 5550,5590 **** int rc = 0; ! if (archive_dir) { ! rc = fprintf(node->archive_fp, ! "\n------------------------------------------------------------------\n" ! "-- End Of Archive Log\n" ! "------------------------------------------------------------------\n" ! "commit;\n" ! "vacuum analyze %s.sl_setsync_offline;\n", ! rtcfg_namespace); ! if (rc < 0) ! { ! archive_terminate(node); ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: " ! "Cannot write to archive file %s - %s\n", ! node->no_id, node->archive_temp, strerror(errno)); ! return -1; ! } ! rc = fclose(node->archive_fp); ! node->archive_fp = NULL; ! if (rc != 0) ! { ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: " ! "Cannot close archive file %s - %s\n", ! node->no_id, node->archive_temp, strerror(errno)); ! return -1; ! } ! rc = rename(node->archive_temp, node->archive_name); ! if (rc != 0) ! { ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: " ! "Cannot rename archive file %s to %s - %s\n", ! node->no_id, node->archive_temp, node->archive_name, ! strerror(errno)); ! return -1; ! } } --- 5483,5531 ---- int rc = 0; ! if (!archive_dir) ! return 0; ! ! if (node->archive_fp == NULL) { ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: " ! "Cannot close archive file %s - not open\n", ! node->no_id, node->archive_temp); ! return -1; ! } ! rc = fprintf(node->archive_fp, ! "\n------------------------------------------------------------------\n" ! "-- End Of Archive Log\n" ! "------------------------------------------------------------------\n" ! "commit;\n" ! "vacuum analyze %s.sl_setsync_offline;\n", ! rtcfg_namespace); ! if (rc < 0) ! { ! archive_terminate(node); ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: " ! "Cannot write to archive file %s - %s\n", ! node->no_id, node->archive_temp, strerror(errno)); ! return -1; ! } ! rc = fclose(node->archive_fp); ! node->archive_fp = NULL; ! if (rc != 0) ! { ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: " ! "Cannot close archive file %s - %s\n", ! node->no_id, node->archive_temp, strerror(errno)); ! return -1; ! } ! ! rc = rename(node->archive_temp, node->archive_name); ! if (rc != 0) ! { ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: " ! "Cannot rename archive file %s to %s - %s\n", ! node->no_id, node->archive_temp, node->archive_name, ! strerror(errno)); ! return -1; } *************** *** 5617,5620 **** --- 5558,5572 ---- int rc; + if (!archive_dir) + return 0; + + if (node->archive_fp == NULL) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - not open\n", + node->no_id, node->archive_temp); + return -1; + } + rc = fprintf(node->archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n" *************** *** 5644,5647 **** --- 5596,5610 ---- int rc; + if (!archive_dir) + return 0; + + if (node->archive_fp == NULL) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - not open\n", + node->no_id, node->archive_temp); + return -1; + } + rc = fprintf(node->archive_fp, "%s\n", dstring_data(ds)); if (rc < 0) *************** *** 5665,5668 **** --- 5628,5642 ---- int rc; + if (!archive_dir) + return 0; + + if (node->archive_fp == NULL) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - not open\n", + node->no_id, node->archive_temp); + return -1; + } + rc = fprintf(node->archive_fp, "%s\n", s); if (rc < 0) *************** *** 5688,5691 **** --- 5662,5676 ---- int rc; + if (!archive_dir) + return 0; + + if (node->archive_fp == NULL) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - not open\n", + node->no_id, node->archive_temp); + return -1; + } + rc = fwrite(s, len, 1, node->archive_fp); if (rc != 1) *************** *** 5701,5727 **** /* ---------- - * archive_void_log - * - * writes out a "void" log consisting of the message which must either - * be a valid SQL query or a SQL comment. - * ---------- - */ - static int - archive_void_log(SlonNode *node, char *seqbuf, const char *message) - { - int rc; - - rc = archive_open(node, seqbuf); - if (rc < 0) - return rc; - rc = archive_append_str(node, message); - if (rc < 0) - return rc; - rc = archive_close(node); - - return rc; - } - - /* ---------- * given a string consisting of a list of actionseq values, return a * string that compresses this into a set of log_actionseq ranges --- 5686,5689 ---- *************** *** 6130,6144 **** if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) { - - if (archive_open(node, seqbuf) < 0) - slon_retry(); - if (archive_tracking(node, rtcfg_namespace, - ddl_setid, seqbuf, seqbuf, - event->ev_timestamp_c) < 0) - slon_retry(); if (archive_append_str(node, ddl_script) < 0) slon_retry(); - if (archive_close(node) < 0) - slon_retry(); } } --- 6092,6097 ----
- Previous message: [Slony1-commit] slony1-engine/src/ducttape Makefile test_8_logship.in
- Next message: [Slony1-commit] slony1-engine configure
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list