Tue Dec 6 20:59:23 PST 2005
- Previous message: [Slony1-commit] By cbbrowne: Bug #1471 - petere - slon documentation "The reference
- Next message: [Slony1-commit] By cbbrowne: Remove pg_listener activity for event confirmations - this
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message:
-----------
Restructuring of the sync_helper thread in preparation for fixing
the out of memory problems on large rows.
Jan
Modified Files:
--------------
slony1-engine/src/slon:
remote_worker.c (r1.101 -> r1.102)
slon.h (r1.55 -> r1.56)
-------------- next part --------------
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.101
retrieving revision 1.102
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.101 -r1.102
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -4782,18 +4782,25 @@
SlonNode *node = wd->node;
PGconn *dbconn;
WorkerGroupLine *line = NULL;
- int line_no;
SlonDString query;
- PGresult *res;
- int ntuples;
- int tupno;
int errors;
- WorkerGroupLine *data_line[SLON_DATA_FETCH_SIZE];
int alloc_lines = 0;
struct timeval tv_start;
struct timeval tv_now;
int first_fetch;
+ WorkerGroupLine *data_line[SLON_DATA_FETCH_SIZE];
+ int data_line_alloc;
+ int data_line_first;
+ int data_line_last;
+
+ PGresult *res;
+ int ntuples;
+ int tupno;
+
+ int line_no;
+ int line_ncmds;
+
dstring_init(&query);
for (;;)
@@ -4827,6 +4834,10 @@
dbconn = provider->conn->dbconn;
pthread_mutex_unlock(&(provider->helper_lock));
+ slon_log(SLON_DEBUG4,
+ "remoteHelperThread_%d_%d: got work to do\n",
+ node->no_id, provider->no_id);
+
errors = 0;
do
{
@@ -4858,6 +4869,7 @@
gettimeofday(&tv_start, NULL);
first_fetch = true;
+ res = NULL;
if (query_execute(node, dbconn, &query) < 0)
{
@@ -4865,74 +4877,137 @@
break;
}
+ slon_mkquery(&query, "fetch %d from LOG; ",
+ SLON_DATA_FETCH_SIZE * SLON_COMMANDS_PER_LINE);
+ data_line_alloc = 0;
+ data_line_first = 0;
+ data_line_last = 0;
+
+ res = NULL;
+ ntuples = 0;
+ tupno = 0;
+
+ while (!errors)
+ {
/*
- * Now fetch the log data and forward it via the line pool to the
- * main worker who pushes it into the local database.
+ * Deliver filled line buffers to the worker process.
*/
- alloc_lines = 0;
- while (errors == 0)
+ if (data_line_last > data_line_first)
{
+ slon_log(SLON_DEBUG4,
+ "remoteHelperThread_%d_%d: deliver %d lines to worker\n",
+ node->no_id, provider->no_id,
+ data_line_last - data_line_first);
+
+ pthread_mutex_lock(&(wd->workdata_lock));
+ while (data_line_first < data_line_last)
+ {
+ DLLIST_ADD_TAIL(wd->repldata_head, wd->repldata_tail,
+ data_line[data_line_first]);
+ data_line_first++;
+ }
+ pthread_cond_signal(&(wd->repldata_cond));
+ pthread_mutex_unlock(&(wd->workdata_lock));
+ }
+
/*
- * Allocate at least some lines - ideally the whole fetch
- * size.
+ * If we cycled through all the allocated line buffers,
+ * reset the indexes.
*/
- while (alloc_lines == 0 && !errors)
+ if (data_line_first == data_line_alloc)
{
- slon_log(SLON_DEBUG4,
- "remoteHelperThread_%d_%d: allocate lines\n",
- node->no_id, provider->no_id);
+ data_line_alloc = 0;
+ data_line_first = 0;
+ data_line_last = 0;
+ }
/*
- * Wait until there are lines available in the pool.
+ * Make sure we are inside memory limits and that we
+ * have available line buffers.
*/
pthread_mutex_lock(&(wd->workdata_lock));
- while (wd->linepool_head == NULL &&
+ if (data_line_alloc == 0 /* || oversize */)
+ {
+ /*
+ * First make sure that the overall memory usage is
+ * inside bouds.
+ */
+ if (0 /* oversize */)
+ {
+ slon_log(SLON_DEBUG4,
+ "remoteHelperThread_%d_%d: wait for oversize memory to free\n",
+ node->no_id, provider->no_id);
+
+ while (/* oversize && */
wd->workgroup_status == SLON_WG_BUSY)
{
pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock));
}
+ if (wd->workgroup_status != SLON_WG_BUSY)
+ {
+ slon_log(SLON_DEBUG4,
+ "remoteHelperThread_%d_%d: abort operation\n",
+ node->no_id, provider->no_id);
+ errors++;
+ break;
+ }
+ }
/*
- * If any error occured somewhere in the group, the main
- * worker will set the status to ABORT.
+ * Second make sure that we have at least 1 line
+ * buffer.
*/
+ if (data_line_alloc == 0)
+ {
+ slon_log(SLON_DEBUG4,
+ "remoteHelperThread_%d_%d: allocate line buffers\n",
+ node->no_id, provider->no_id);
+ while (data_line_alloc == 0 && !errors)
+ {
+ while (wd->linepool_head == NULL &&
+ wd->workgroup_status == SLON_WG_BUSY)
+ {
+ pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock));
+ }
if (wd->workgroup_status != SLON_WG_BUSY)
{
slon_log(SLON_DEBUG4,
"remoteHelperThread_%d_%d: abort operation\n",
node->no_id, provider->no_id);
- pthread_mutex_unlock(&(wd->workdata_lock));
errors++;
break;
}
/*
- * So far so good. Fill our array of lines from the pool.
+ * While we are at it, we can as well allocate
+ * up to FETCH_SIZE buffers.
*/
- while (alloc_lines < SLON_DATA_FETCH_SIZE &&
+ while (data_line_alloc < SLON_DATA_FETCH_SIZE &&
wd->linepool_head != NULL)
{
- data_line[alloc_lines] = wd->linepool_head;
+ data_line[data_line_alloc] = wd->linepool_head;
DLLIST_REMOVE(wd->linepool_head, wd->linepool_tail,
- data_line[alloc_lines]);
- alloc_lines++;
+ data_line[data_line_alloc]);
+ data_line_alloc++;
}
- pthread_mutex_unlock(&(wd->workdata_lock));
}
-
- if (errors)
- break;
-
- slon_log(SLON_DEBUG4,
- "remoteHelperThread_%d_%d: have %d line buffers\n",
- node->no_id, provider->no_id, alloc_lines);
+ }
+ }
+ pthread_mutex_unlock(&(wd->workdata_lock));
/*
- * Now that we have allocated some buffer space, try to fetch
- * that many rows from the cursor.
+ * We are within memory limits and have allocated
+ * line buffers. Make sure that we have log lines
+ * fetched.
*/
- slon_mkquery(&query, "fetch %d from LOG; ",
- alloc_lines * SLON_COMMANDS_PER_LINE);
+ if (tupno >= ntuples)
+ {
+ slon_log(SLON_DEBUG4,
+ "remoteHelperThread_%d_%d: fetch from cursor\n",
+ node->no_id, provider->no_id);
+ if (res != NULL)
+ PQclear(res);
+
res = PQexec(dbconn, dstring_data(&query));
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
@@ -4940,7 +5015,6 @@
node->no_id, provider->no_id,
dstring_data(&query),
PQresultErrorMessage(res));
- PQclear(res);
errors++;
break;
}
@@ -4955,16 +5029,34 @@
first_fetch = false;
}
- /*
- * Fill the line buffers with queries from the retrieved log
- * rows.
- */
- line_no = 0;
ntuples = PQntuples(res);
- slon_log(SLON_DEBUG3,
- "remoteHelperThread_%d_%d: got %d log rows\n",
+ tupno = 0;
+
+ slon_log(SLON_DEBUG4,
+ "remoteHelperThread_%d_%d: fetched %d log rows\n",
node->no_id, provider->no_id, ntuples);
- for (tupno = 0; tupno < ntuples; tupno++)
+ }
+
+ /*
+ * If there are no more tuples, we're done
+ */
+ if (ntuples == 0)
+ break;
+
+ /*
+ * Now move tuples from the fetch result into the
+ * line buffers.
+ */
+ line_no = data_line_last++;
+ line_ncmds = 0;
+
+ line = data_line[line_no];
+ line->code = SLON_WGLC_ACTION;
+ line->provider = provider;
+ dstring_reset(&(line->data));
+ dstring_reset(&(line->log));
+
+ while (tupno < ntuples && line_no < data_line_alloc)
{
char *log_origin = PQgetvalue(res, tupno, 0);
char *log_xid = PQgetvalue(res, tupno, 1);
@@ -4974,14 +5066,7 @@
char *log_cmdtype = PQgetvalue(res, tupno, 4);
char *log_cmddata = PQgetvalue(res, tupno, 5);
- if (tupno % SLON_COMMANDS_PER_LINE == 0)
- {
- line = data_line[line_no++];
- line->code = SLON_WGLC_ACTION;
- line->provider = provider;
- dstring_reset(&(line->data));
- dstring_reset(&(line->log));
- }
+ tupno++;
/*
* This can happen if the table belongs to a set that
@@ -4993,6 +5078,10 @@
wd->tab_fqname[log_tableid] == NULL)
continue;
+ /*
+ * If we are forwarding this set, add the insert
+ * into sl_log_?
+ */
if (wd->tab_forward[log_tableid])
{
slon_appendquery(&(line->log),
@@ -5005,6 +5094,10 @@
log_origin, log_xid, log_tableid,
log_actionseq, log_cmdtype, log_cmddata);
}
+
+ /*
+ * Add the actual replicating command to the line buffer
+ */
switch (*log_cmdtype)
{
case 'I':
@@ -5028,62 +5121,63 @@
log_cmddata);
break;
}
- }
- PQclear(res);
+ line_ncmds++;
- /*
- * Now put all the line buffers back. Filled ones into the
- * repldata, unused ones into the pool.
- */
- pthread_mutex_lock(&(wd->workdata_lock));
- for (tupno = 0; tupno < alloc_lines; tupno++)
+ if (line_ncmds >= SLON_COMMANDS_PER_LINE)
{
- if (tupno < line_no)
- DLLIST_ADD_TAIL(wd->repldata_head, wd->repldata_tail,
- data_line[tupno]);
- else
- DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail,
- data_line[tupno]);
- }
- if (line_no > 0)
- pthread_cond_signal(&(wd->repldata_cond));
- if (line_no < alloc_lines)
- pthread_cond_broadcast(&(wd->linepool_cond));
- pthread_mutex_unlock(&(wd->workdata_lock));
+ if (data_line_last >= data_line_alloc)
+ break;
+ line_no = data_line_last++;
+
+ line = data_line[line_no];
+ line->code = SLON_WGLC_ACTION;
+ line->provider = provider;
+ dstring_reset(&(line->data));
+ dstring_reset(&(line->log));
- slon_log(SLON_DEBUG3,
- "remoteHelperThread_%d_%d: %d log buffers delivered\n",
- node->no_id, provider->no_id, line_no);
+ line_ncmds = 0;
+ }
+ }
- if (line_no < alloc_lines)
+ /*
+ * Move one line back if we actually ran out of
+ * tuples on an exact SLON_COMMANDS_PER_LINE boundary.
+ */
+ if (line_ncmds == 0)
{
- slon_log(SLON_DEBUG4,
- "remoteHelperThread_%d_%d: no more log rows\n",
- node->no_id, provider->no_id);
- alloc_lines = 0;
- break;
- }
- alloc_lines = 0;
+ data_line_last--;
}
+ } /* Cursor returned EOF */
} while (0);
/*
* if there are still line buffers allocated, give them back.
*/
- if (alloc_lines > 0)
+ if (data_line_first < data_line_alloc)
{
slon_log(SLON_DEBUG4,
- "remoteHelperThread_%d_%d: return unused line buffers\n",
- node->no_id, provider->no_id);
+ "remoteHelperThread_%d_%d: return %d unused line buffers\n",
+ node->no_id, provider->no_id,
+ data_line_alloc - data_line_first);
pthread_mutex_lock(&(wd->workdata_lock));
- while (alloc_lines > 0)
+ while (data_line_first < data_line_alloc)
{
- alloc_lines--;
+ data_line_alloc--;
DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail,
- data_line[alloc_lines]);
+ data_line[data_line_alloc]);
}
pthread_cond_broadcast(&(wd->linepool_cond));
pthread_mutex_unlock(&(wd->workdata_lock));
+
+ data_line_alloc = 0;
+ data_line_first = 0;
+ data_line_last = 0;
+ }
+
+ if(res != NULL)
+ {
+ PQclear(res);
+ res = NULL;
}
/*
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.55
retrieving revision 1.56
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.55 -r1.56
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -37,7 +37,7 @@
#else
#define SLON_COMMANDS_PER_LINE 10
#define SLON_DATA_FETCH_SIZE 10
-#define SLON_WORKLINES_PER_HELPER (SLON_DATA_FETCH_SIZE * 50)
+#define SLON_WORKLINES_PER_HELPER (SLON_DATA_FETCH_SIZE * 5)
#endif
#define SLON_MAX_PATH 1024
- Previous message: [Slony1-commit] By cbbrowne: Bug #1471 - petere - slon documentation "The reference
- Next message: [Slony1-commit] By cbbrowne: Remove pg_listener activity for event confirmations - this
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list