Jan Wieck wieck at lists.slony.info
Thu Jun 7 06:01:12 PDT 2007
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv25549/src/slon

Modified Files:
	remote_worker.c 
Log Message:
Attempt to remove the sl_seqlog flood.

This patch adds a new function seqtrack(seqid,seqvalue) that will return
NULL if called with the same seqvalue before, the seqvalue instead. This
function is used to suppress sl_seqlog rows of sequences that didn't change
since the last SYNC. 

Jan


Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.142
retrieving revision 1.143
diff -C2 -d -r1.142 -r1.143
*** remote_worker.c	6 Jun 2007 16:20:56 -0000	1.142
--- remote_worker.c	7 Jun 2007 13:01:10 -0000	1.143
***************
*** 3129,3133 ****
  	 */
  	(void) slon_mkquery(&query1,
! 				 "select SL.seql_seqid, SL.seql_last_value, "
  				 "    %s.slon_quote_brute(PGN.nspname) || '.' || "
  				 "    %s.slon_quote_brute(PGC.relname) as tab_fqname "
--- 3129,3133 ----
  	 */
  	(void) slon_mkquery(&query1,
! 				 "select SL.seql_seqid, max(SL.seql_last_value), "
  				 "    %s.slon_quote_brute(PGN.nspname) || '.' || "
  				 "    %s.slon_quote_brute(PGC.relname) as tab_fqname "
***************
*** 3137,3143 ****
  				 "	where SQ.seq_set = %d "
  				 "		and SL.seql_seqid = SQ.seq_id "
! 				 "		and SL.seql_ev_seqno = '%s' "
  				 "		and PGC.oid = SQ.seq_reloid "
! 				 "		and PGN.oid = PGC.relnamespace; ",
  				 rtcfg_namespace,
  				 rtcfg_namespace,
--- 3137,3144 ----
  				 "	where SQ.seq_set = %d "
  				 "		and SL.seql_seqid = SQ.seq_id "
! 				 "		and SL.seql_ev_seqno <= '%s' "
  				 "		and PGC.oid = SQ.seq_reloid "
! 				 "		and PGN.oid = PGC.relnamespace "
! 				 "  group by 1, 3; ",
  				 rtcfg_namespace,
  				 rtcfg_namespace,
***************
*** 3174,3204 ****
  
  
! 		/*
! 		 * sequence with ID 0 is a nodes rowid ... only remember in seqlog.
! 		 */
! 		if (strtol(seql_seqid, NULL, 10) != 0)
! 		{
! 			(void) slon_mkquery(&query1,
! 						 "select \"pg_catalog\".setval('%q', '%s'); ",
! 						 seq_fqname, seql_last_value);
  
! 			if (archive_dir)
! 			{
! 				rc = archive_append_ds(node, &query1);
! 				if (rc < 0) {
! 				  PQclear(res1);
! 				  slon_disconnectdb(pro_conn);
! 				  dstring_free(&query1);
! 				  dstring_free(&query2);
! 				  dstring_free(&query3);
! 				  dstring_free(&lsquery);
! 				  dstring_free(&indexregenquery);
! 				  archive_terminate(node);
! 				  return -1;
! 				}
  			}
  		}
! 		else
! 			dstring_reset(&query1);
  		slon_appendquery(&query1,
  						 "insert into %s.sl_seqlog "
--- 3175,3198 ----
  
  
! 		(void) slon_mkquery(&query1,
! 					 "select \"pg_catalog\".setval('%q', '%s'); ",
! 					 seq_fqname, seql_last_value);
  
! 		if (archive_dir)
! 		{
! 			rc = archive_append_ds(node, &query1);
! 			if (rc < 0) {
! 			  PQclear(res1);
! 			  slon_disconnectdb(pro_conn);
! 			  dstring_free(&query1);
! 			  dstring_free(&query2);
! 			  dstring_free(&query3);
! 			  dstring_free(&lsquery);
! 			  dstring_free(&indexregenquery);
! 			  archive_terminate(node);
! 			  return -1;
  			}
  		}
! 
  		slon_appendquery(&query1,
  						 "insert into %s.sl_seqlog "
***************
*** 3588,3591 ****
--- 3582,3586 ----
  
  	int			actionlist_len;
+ 	int64		min_ssy_seqno;
  
  	gettimeofday(&tv_start, NULL);
***************
*** 3740,3743 ****
--- 3735,3739 ----
  					 event->ev_maxxid_c);
  
+ 	min_ssy_seqno = -1;
  	for (provider = wd->provider_head; provider; provider = provider->next)
  	{
***************
*** 3806,3809 ****
--- 3802,3810 ----
  			char	   *ssy_xip = PQgetvalue(res1, tupno1, 4);
  			char	   *ssy_action_list = PQgetvalue(res1, tupno1, 5);
+ 			int64		ssy_seqno;
+ 
+ 			slon_scanint64(PQgetvalue(res1, tupno1, 1), &ssy_seqno);
+ 			if (min_ssy_seqno < 0 || ssy_seqno < min_ssy_seqno)
+ 				min_ssy_seqno = ssy_seqno;
  
  			/*
***************
*** 4281,4300 ****
  		int			ntuples1;
  		int			tupno1;
  
  		(void) slon_mkquery(&query,
! 					 "select SL.seql_seqid, SL.seql_last_value "
  					 "	from %s.sl_seqlog SL, "
  					 "		%s.sl_sequence SQ "
  					 "	where SQ.seq_id = SL.seql_seqid "
  					 "		and SL.seql_origin = %d "
! 					 "		and SL.seql_ev_seqno = '%s' "
  					 "		and SQ.seq_set in (",
  					 rtcfg_namespace, rtcfg_namespace,
! 					 node->no_id, seqbuf);
  		for (pset = provider->set_head; pset; pset = pset->next)
  			slon_appendquery(&query, "%s%d",
  							 (pset->prev == NULL) ? "" : ",",
  							 pset->set_id);
! 		slon_appendquery(&query, "); ");
  
  		res1 = PQexec(provider->conn->dbconn, dstring_data(&query));
--- 4282,4306 ----
  		int			ntuples1;
  		int			tupno1;
+ 		char		min_ssy_seqno_buf[64];
+ 
+ 		sprintf(min_ssy_seqno_buf, INT64_FORMAT, min_ssy_seqno);
  
  		(void) slon_mkquery(&query,
! 					 "select SL.seql_seqid, max(SL.seql_last_value) "
  					 "	from %s.sl_seqlog SL, "
  					 "		%s.sl_sequence SQ "
  					 "	where SQ.seq_id = SL.seql_seqid "
  					 "		and SL.seql_origin = %d "
! 					 "		and SL.seql_ev_seqno <= '%s' "
! 					 "		and SL.seql_ev_seqno >= '%s' "
  					 "		and SQ.seq_set in (",
  					 rtcfg_namespace, rtcfg_namespace,
! 					 node->no_id, seqbuf, min_ssy_seqno_buf);
  		for (pset = provider->set_head; pset; pset = pset->next)
  			slon_appendquery(&query, "%s%d",
  							 (pset->prev == NULL) ? "" : ",",
  							 pset->set_id);
! 		slon_appendquery(&query, ") "
! 					"  group by 1; ");
  
  		res1 = PQexec(provider->conn->dbconn, dstring_data(&query));



More information about the Slony1-commit mailing list