Fri Feb 25 20:59:07 PST 2005
- Previous message: [Slony1-commit] By cbbrowne: Fix typo in comment - NO semantic change
- Next message: [Slony1-commit] By cbbrowne: Added additional comments about usage of log shipping
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- Factored log shipping code that was integrated into the function sync_event() into separate functions so that they may be used for other events as well. Further functional changes: - archive header contains comment lines to indicate where the "header" ends and the 'body' of queries begins. That will be useful for splitting the logs apart so that you test-apply the header (to see if it's valid) before then applying the Whole Log. Blindly applying the Whole Thing is a bad idea because it introduces a boatload of parsing work for any log that is applied in inappropriate order. - more error checking has been introduced The "archive log" processing functions check return codes from fprintf() and such, and if they encounter errors, this will abort processing of the SYNC. There's still more to do in this regard; close_log_archive() does not yet check rcs... Modified Files: -------------- slony1-engine/src/slon: remote_worker.c (r1.74 -> r1.75) -------------- next part -------------- Index: remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.74 retrieving revision 1.75 diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.74 -r1.75 --- src/slon/remote_worker.c +++ src/slon/remote_worker.c @@ -240,7 +240,18 @@ WorkerGroupData * wd, SlonWorkMsg_event * event); static void *sync_helper(void *cdata); -#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); if (archive_fp) { fclose(archive_fp); unlink(archive_tmp); } +static char archive_name[SLON_MAX_PATH]; +static char archive_tmp[SLON_MAX_PATH]; +static FILE *archive_fp = NULL; +static int open_log_archive (int node_id, char *seqbuf); +static void close_log_archive (); +static void terminate_log_archive (); +static int generate_archive_header (int node_id, char *seqbuf); +static int submit_query_to_archive(SlonDString *ds); +static int submit_string_to_archive (const char *s); +static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf); + +#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive(); /* * ---------- slon_remoteWorkerThread @@ -2931,6 +2942,7 @@ int num_errors; WorkerGroupLine *wgline; int i; + int rc; char seqbuf [64]; struct timeval tv_start; struct timeval tv_now; @@ -2939,10 +2951,6 @@ SlonDString query; SlonDString *provider_qual; - char archive_name[SLON_MAX_PATH]; - char archive_tmp[SLON_MAX_PATH]; - FILE *archive_fp = NULL; - gettimeofday(&tv_start, NULL); slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT " processing\n", @@ -2957,28 +2965,21 @@ */ if (archive_dir) { - int i; - - sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node->no_id); - for (i = strlen(seqbuf); i < 20; i++) - strcat(archive_name, "0"); - strcat(archive_name, seqbuf); - strcat(archive_name, ".sql"); - strcpy(archive_tmp, archive_name); - strcat(archive_tmp, ".tmp"); - - if ((archive_fp = fopen(archive_tmp, "w")) == NULL) - { + rc = open_log_archive(node->no_id, seqbuf); + if (rc == -1) { slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "Cannot open archive file %s - %s\n", node->no_id, archive_tmp, strerror(errno)); dstring_free(&query); return 60; } - fprintf(archive_fp, "-- Slony-I sync log\n" - "-- Event %d,%s\n" - "start transaction;\n", - node->no_id, seqbuf); + rc = generate_archive_header(node->no_id, seqbuf); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s", + node->no_id, archive_tmp, strerror(errno)); + return 60; + } } /* @@ -3286,11 +3287,16 @@ * the archive log. This function ensures that all * archive log files are applied in the right order. */ - if (archive_fp) + if (archive_dir) { - fprintf(archive_fp, "select %s.setsyncTracking_offline(%d, '%s', '%s');\n", - rtcfg_namespace, - sub_set, PQgetvalue(res1, tupno1, 1), seqbuf); + rc = logarchive_tracking(rtcfg_namespace, sub_set, + PQgetvalue(res1, tupno1, 1), seqbuf); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s", + node->no_id, archive_tmp, strerror(errno)); + return 60; + } } } PQclear(res1); @@ -3321,12 +3327,8 @@ "no sets need syncing for this event\n", node->no_id); dstring_free(&query); - if (archive_fp) - { - fprintf(archive_fp, "commit;\n"); - fclose(archive_fp); - rename(archive_tmp, archive_name); - } + if (archive_dir) + close_log_archive(); return 0; } @@ -3441,11 +3443,16 @@ * Add the user data modification part to * the archive log. */ - if (archive_fp) - { - fprintf(archive_fp, "%s", dstring_data(&(wgline->data))); + if (archive_dir) { + rc = submit_string_to_archive(dstring_data(&(wgline->data))); + /* rc = fprintf(archive_fp, "%s", dstring_data(&(wgline->data))); */ + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s", + node->no_id, archive_tmp, strerror(errno)); + return 60; + } } - break; case SLON_WGLC_DONE: @@ -3587,13 +3594,19 @@ /* * Add the sequence number adjust call to the archive log. */ - if (archive_fp) + if (archive_dir) { slon_mkquery(&query, "select %s.sequenceSetValue_offline(%s,'%s');\n", rtcfg_namespace, seql_seqid, seql_last_value); - fprintf(archive_fp, dstring_data(&query)); + rc = submit_query_to_archive(&query); + if (rc < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot write to archive file %s - %s", + node->no_id, archive_tmp, strerror(errno)); + return 60; + } } } PQclear(res1); @@ -3716,11 +3729,9 @@ * Add the final commit to the archive log, close it and rename * the temporary file to the real log chunk filename. */ - if (archive_fp) + if (archive_dir) { - fprintf(archive_fp, "commit;\n"); - fclose(archive_fp); - rename(archive_tmp, archive_name); + close_log_archive(); } /* @@ -4014,7 +4025,7 @@ pthread_mutex_unlock(&(wd->workdata_lock)); slon_log(SLON_DEBUG3, - "remoteHelperThread_%d_%d: %d log buffers deliverd\n", + "remoteHelperThread_%d_%d: %d log buffers delivered\n", node->no_id, provider->no_id, line_no); if (line_no < alloc_lines) @@ -4113,6 +4124,83 @@ } } +/* Functions for processing log archives... + + - First, you open the log archive using open_log_archive() + + - Second, you generate the header using generate_archive_header() + + - Third, you need to set up the sync tracking function in the log + using logarchive_tracking() + + ============= Here Ends The Header of the Log Shipping Archive ================== + + Then come the various queries (inserts/deletes/updates) that + comprise the "body" of the SYNC. Probably submitted using + submit_query_to_archive(). + + ============= Here Ends The Body of the Log Shipping Archive ================== + + Finally, the log ends, notably with a COMMIT statement, generated + using close_log_archive(), which closes the file and renames it + from ".tmp" form to the final name. +*/ + + +/* Stores the archive name in archive_name (as .sql name) and archive_tmp (.tmp file) */ +int open_log_archive (int node_id, char *seqbuf) { + int i; + sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node_id); + for (i = strlen(seqbuf); i < 20; i++) + strcat(archive_name, "0"); + strcat(archive_name, seqbuf); + strcat(archive_name, ".sql"); + strcpy(archive_tmp, archive_name); + strcat(archive_tmp, ".tmp"); + archive_fp = fopen(archive_tmp, "w"); + if (archive_fp == NULL) { + return -1; + } else { + return 0; + } +} + +void close_log_archive () { + fprintf(archive_fp, "\n------------------------------------------------------------------\n-- End Of Archive Log\n------------------------------------------------------------------\ncommit;\n"); + fclose(archive_fp); + rename(archive_tmp, archive_name); +} + +int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, const char *seqbuf) { + return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s');\n-- end of log archiving header\n------------------------------------------------------------------\n-- start of Slony-I data\n------------------------------------------------------------------\n", + namespace, sub_set, firstseq, seqbuf); +} + +int submit_query_to_archive(SlonDString *ds) { + return fprintf(archive_fp, "%s\n", *ds->data); +} + +int submit_string_to_archive (const char *s) { + return fprintf(archive_fp, "%s\n", s); +} + +void terminate_log_archive () { + if (archive_fp) { + fclose(archive_fp); + } +} + +int generate_archive_header (int node_id, char *seqbuf) { + time_t now; + now = time(NULL); + return fprintf(archive_fp, + "-- Slony-I sync log\n" + "-- Node %d, Event %s\n" + "-- at... %s\n" + "start transaction;\n", + node_id, seqbuf, ctime(&now)); +} + /* * Local Variables: * tab-width: 4
- Previous message: [Slony1-commit] By cbbrowne: Fix typo in comment - NO semantic change
- Next message: [Slony1-commit] By cbbrowne: Added additional comments about usage of log shipping
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list