Wed Dec 7 03:52:44 PST 2005
- Previous message: [Slony1-commit] By cbbrowne: Remove pg_listener activity for event confirmations - this
- Next message: [Slony1-commit] By dpage: Comment out redundant query parameter and fix query so it
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- Fix for the exzessive memory allocation problem when replicating data with large attributes. New config options sync_max_rowsize (default 8k) sync_max_largemem (default 5M) Slon will try to keep the memory allocation "per provider" within 500 x sync_max_rowsize ... 500 x sync_max_rowsize + sync_max_largemem. With the default settings, this means 5-10 MB. Jan Modified Files: -------------- slony1-engine/share: slon.conf-sample (r1.3 -> r1.4) slony1-engine/src/slon: confoptions.h (r1.26 -> r1.27) remote_worker.c (r1.102 -> r1.103) slon.h (r1.56 -> r1.57) -------------- next part -------------- Index: slon.conf-sample =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/share/slon.conf-sample,v retrieving revision 1.3 retrieving revision 1.4 diff -Lshare/slon.conf-sample -Lshare/slon.conf-sample -u -w -r1.3 -r1.4 --- share/slon.conf-sample +++ share/slon.conf-sample @@ -31,6 +31,20 @@ # Range: [0,100], default: 6 #sync_group_maxsize=6 +# Size above which an sl_log_? row's log_cmddata is considered large. +# Up to 500 rows of this size are allowed in memory at once. Rows larger +# than that count into the sync_max_largemem space allocated and free'd +# on demand. +# Range: [1024,32768], default: 8192 +#sync_max_rowsize=8192 + +# Maximum amount of memory allowed for large rows. Note that the algorithm +# will stop fetching rows AFTER this amount is exceeded, not BEFORE. This +# is done to ensure that a single row exceeding this limit alone does not +# stall replication. +# Range: [1048576,1073741824], default: 5242880 +#sync_max_largemem=5242880 + # If this parameter is 1, messages go both to syslog and the standard # output. A value of 2 sends output only to syslog (some messages will # still go to the standard output/error). The default is 0, which means Index: confoptions.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.h,v retrieving revision 1.26 retrieving revision 1.27 diff -Lsrc/slon/confoptions.h -Lsrc/slon/confoptions.h -u -w -r1.26 -r1.27 --- src/slon/confoptions.h +++ src/slon/confoptions.h @@ -21,6 +21,8 @@ extern int slon_log_level; extern int sync_interval; extern int sync_interval_timeout; +extern int sync_max_rowsize; +extern int sync_max_largemem; extern int sync_group_maxsize; extern int desired_sync_time; @@ -218,6 +220,30 @@ 0, 2147483647 }, + { + { + (const char *)"sync_max_rowsize", /* conf name */ + gettext_noop("sl_log_? rows larger than that are read separately"), /* short desc */ + gettext_noop("sl_log_? rows larger than that are read separately"), /* long desc */ + SLON_C_INT /* config type */ + }, + &sync_max_rowsize, /* var name */ + 8192, /* default val */ + 1024, /* min val */ + 32768 /* max val */ + }, + { + { + (const char *)"sync_max_largemem", /* conf name */ + gettext_noop("How much memory to allow for sl_log_? rows exceeding sync_max_rowsize"), /* short desc */ + gettext_noop("How much memory to allow for sl_log_? rows exceeding sync_max_rowsize"), /* long desc */ + SLON_C_INT /* config type */ + }, + &sync_max_largemem, /* var name */ + 5242880, /* default val */ + 1048576, /* min val */ + 1073741824 /* max val */ + }, {0} }; Index: remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.102 retrieving revision 1.103 diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.102 -r1.103 --- src/slon/remote_worker.c +++ src/slon/remote_worker.c @@ -171,6 +171,7 @@ pthread_mutex_t workdata_lock; WorkGroupStatus workgroup_status; + int workdata_largemem; pthread_cond_t repldata_cond; WorkerGroupLine *repldata_head; @@ -188,6 +189,7 @@ ProviderInfo *provider; SlonDString data; SlonDString log; + int line_largemem; WorkerGroupLine *prev; WorkerGroupLine *next; @@ -212,6 +214,8 @@ pthread_mutex_t node_confirm_lock = PTHREAD_MUTEX_INITIALIZER; int sync_group_maxsize; +int sync_max_rowsize; +int sync_max_largemem; int last_sync_group_size; int next_sync_group_size; @@ -304,6 +308,7 @@ pthread_mutex_lock(&(wd->workdata_lock)); wd->workgroup_status = SLON_WG_IDLE; wd->node = node; + wd->workdata_largemem = 0; wd->tab_fqname_size = SLON_MAX_PATH; wd->tab_fqname = (char **)malloc(sizeof(char *) * wd->tab_fqname_size); @@ -1700,6 +1705,7 @@ line = (WorkerGroupLine *) malloc(sizeof(WorkerGroupLine)); memset(line, 0, sizeof(WorkerGroupLine)); + line->line_largemem = 0; dstring_init(&(line->data)); dstring_init(&(line->log)); DLLIST_ADD_TAIL(wd->linepool_head, wd->linepool_tail, @@ -4533,8 +4539,26 @@ for (wgline = lines_head; wgline; wgline = wgnext) { wgnext = wgline->next; + if (wgline->line_largemem > 0) + { + /* + * Really free the lines that contained large rows + */ + dstring_free(&(wgline->data)); + dstring_free(&(wgline->log)); + dstring_init(&(wgline->data)); + dstring_init(&(wgline->log)); + wd->workdata_largemem -= wgline->line_largemem; + wgline->line_largemem = 0; + } + else + { + /* + * just reset (and allow to grow further) the small ones + */ dstring_reset(&(wgline->data)); dstring_reset(&(wgline->log)); + } DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail, wgline); } if (num_errors == 1) @@ -4783,6 +4807,7 @@ PGconn *dbconn; WorkerGroupLine *line = NULL; SlonDString query; + SlonDString query2; int errors; int alloc_lines = 0; struct timeval tv_start; @@ -4795,6 +4820,7 @@ int data_line_last; PGresult *res; + PGresult *res2; int ntuples; int tupno; @@ -4802,6 +4828,7 @@ int line_ncmds; dstring_init(&query); + dstring_init(&query2); for (;;) { @@ -4862,8 +4889,13 @@ slon_mkquery(&query, "declare LOG cursor for select " " log_origin, log_xid, log_tableid, " - " log_actionseq, log_cmdtype, log_cmddata " + " log_actionseq, log_cmdtype, " + " octet_length(log_cmddata), " + " case when octet_length(log_cmddata) <= %d " + " then log_cmddata " + " else null end " "from %s.sl_log_1 %s order by log_actionseq; ", + sync_max_rowsize, rtcfg_namespace, dstring_data(&(provider->helper_qualification))); @@ -4926,19 +4958,20 @@ * have available line buffers. */ pthread_mutex_lock(&(wd->workdata_lock)); - if (data_line_alloc == 0 /* || oversize */) + if (data_line_alloc == 0 || + wd->workdata_largemem > sync_max_largemem) { /* * First make sure that the overall memory usage is * inside bouds. */ - if (0 /* oversize */) + if (wd->workdata_largemem > sync_max_largemem) { slon_log(SLON_DEBUG4, "remoteHelperThread_%d_%d: wait for oversize memory to free\n", node->no_id, provider->no_id); - while (/* oversize && */ + while (wd->workdata_largemem > sync_max_largemem && wd->workgroup_status == SLON_WG_BUSY) { pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock)); @@ -5064,10 +5097,49 @@ NULL, 10); char *log_actionseq = PQgetvalue(res, tupno, 3); char *log_cmdtype = PQgetvalue(res, tupno, 4); - char *log_cmddata = PQgetvalue(res, tupno, 5); + int log_cmdsize = strtol(PQgetvalue(res, tupno, 5), + NULL, 10); + char *log_cmddata = PQgetvalue(res, tupno, 6); + int largemem = 0; tupno++; + if (log_cmdsize >= sync_max_rowsize) + { + slon_mkquery(&query2, + "select log_cmddata " + "from %s.sl_log_1 " + "where log_origin = '%s' " + " and log_xid = '%s' " + " and log_actionseq = '%s'", + rtcfg_namespace, + log_origin, log_xid, log_actionseq); + res2 = PQexec(dbconn, dstring_data(&query2)); + if (PQresultStatus(res2) != PGRES_TUPLES_OK) + { + slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s", + node->no_id, provider->no_id, + dstring_data(&query), + PQresultErrorMessage(res2)); + PQclear(res2); + errors++; + break; + } + if (PQntuples(res2) != 1) + { + slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: large log_cmddata for actionseq %s not found\n", + node->no_id, provider->no_id, + dstring_data(&query), + log_actionseq); + PQclear(res2); + errors++; + break; + } + + log_cmddata = PQgetvalue(res2, 0, 0); + largemem = log_cmdsize; + } + /* * This can happen if the table belongs to a set that * already has a better sync status than the event we're @@ -5093,11 +5165,13 @@ rtcfg_namespace, log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata); + largemem *= 2; } /* * Add the actual replicating command to the line buffer */ + line->line_largemem += largemem; switch (*log_cmdtype) { case 'I': @@ -5137,6 +5211,30 @@ line_ncmds = 0; } + + /* + * If this was a large log_cmddata entry + * (> sync_max_rowsize), add this to the memory + * usage of the workgroup and check if we are + * exceeding limits. + */ + if (largemem > 0) + { + PQclear(res2); + pthread_mutex_lock(&(wd->workdata_lock)); + wd->workdata_largemem += largemem; + if (wd->workdata_largemem >= sync_max_largemem) + { + /* + * This is it ... we exit the loop here + * and wait for the worker to apply enough + * of the large rows first. + */ + pthread_mutex_unlock(&(wd->workdata_lock)); + break; + } + pthread_mutex_unlock(&(wd->workdata_lock)); + } } /* Index: slon.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v retrieving revision 1.56 retrieving revision 1.57 diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.56 -r1.57 --- src/slon/slon.h +++ src/slon/slon.h @@ -509,6 +509,8 @@ * ---------- */ extern int sync_group_maxsize; +extern int sync_max_rowsize; +extern int sync_max_largemem; /* ----------
- Previous message: [Slony1-commit] By cbbrowne: Remove pg_listener activity for event confirmations - this
- Next message: [Slony1-commit] By dpage: Comment out redundant query parameter and fix query so it
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list