Switch to side-by-side view

--- a
+++ b/feasible_joint_stiffness/lrslib-062/mplrs.c
@@ -0,0 +1,2027 @@
+/* mplrs.c: initial release of MPI version 
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+
+Author: Charles Jordan skip@ist.hokudai.ac.jp
+Based on plrs.cpp by Gary Roumanis
+Initial lrs Author: David Avis avis@cs.mcgill.ca
+ */
+
+/* #include "lrslib.h" */ /* included in mplrs.h */
+#include "mplrs.h"
+
+#include <mpi.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <time.h>
+
+/* global variables */
+mplrsv mplrs;       /* state of this process */
+masterv master;     /* state of the master   */
+consumerv consumer; /* state of the consumer */
+
+int PLRS_DEBUG = 0; 
+
+/******************
+ * initialization *
+ ******************/
+
+int main(int argc, char **argv)
+{
+	mplrs_init(argc, argv);
+
+	mprintf2(("%d: initialized on %s\n",mplrs.rank,mplrs.host));
+
+	if (mplrs.size<3)
+		return mplrs_fallback();
+
+	if (mplrs.rank == MASTER)
+		return mplrs_master();
+	else if (mplrs.rank == CONSUMER)
+		return mplrs_consumer();
+	else
+		return mplrs_worker();
+}
+
+void mplrs_init(int argc, char **argv)
+{
+	int i,j;
+	int count;
+	char c;
+	time_t curt = time(NULL);
+	char *tim, *tim1;
+
+	/* make timestamp for filenames */
+	tim = ctime(&curt);
+	tim1 = tim+4;
+	tim1[3] = tim1[6] = '_';
+	tim1[15]='\0';
+
+	/* start MPI */
+	MPI_Init(&argc, &argv);
+	MPI_Comm_rank(MPI_COMM_WORLD, &mplrs.rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &mplrs.size);
+	MPI_Get_processor_name(mplrs.host, &count);
+
+	/* allocate mp for volume calculation, from plrs */
+	lrs_alloc_mp(mplrs.tN);   lrs_alloc_mp(mplrs.tD); 
+	lrs_alloc_mp(mplrs.Vnum); lrs_alloc_mp(mplrs.Vden);
+	lrs_alloc_mp(mplrs.Tnum); lrs_alloc_mp(mplrs.Tden);
+	itomp(ZERO, mplrs.Vnum);  itomp(ONE, mplrs.Vden);
+
+	gettimeofday(&mplrs.start, NULL);
+
+	mplrs_initstrucs(); /* initialize default values of globals */
+
+	/* process commandline arguments, set input and output files */
+	mplrs_commandline(argc, argv);
+
+	if (mplrs.rank == MASTER || mplrs.rank == CONSUMER)
+	{
+		/* open input file for reading on master, open output file for
+	 	 * writing on consumer, open histogram file for writing on
+	 	 * master -- if these files are to be used.
+	 	 */
+		mplrs_initfiles();
+		if (mplrs.rank == MASTER)
+			master_sendfile();
+	}
+	else
+	{
+		/* receive input file from master */
+		MPI_Recv(&count, 1, MPI_INT, 0, 20, MPI_COMM_WORLD, 
+			 MPI_STATUS_IGNORE);
+		mplrs.input = (char *)malloc(sizeof(char)*(count+1));
+		MPI_Recv(mplrs.input, count, MPI_CHAR, 0, 20, 
+			 MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+
+		mplrs.input[count] = '\0';
+
+		/* get number of chars needed for worker files */
+		j = mplrs.size;
+		for (i=1; j>9; i++)
+			j = j/10;
+		i += 6+strlen(tim1); /* mplrs_TIMESTAMP */
+		i += strlen(mplrs.tfn_prefix) + strlen(mplrs.input_filename) + 
+		     i + 6; /* _%d.ine\0 */
+		mplrs.tfn = (char *)malloc(sizeof(char) * i);
+		sprintf(mplrs.tfn, "%smplrs_%s%s_%d.ine", 
+						  mplrs.tfn_prefix, tim1, 
+						  mplrs.input_filename,
+					 	  mplrs.rank);
+		/* flatten directory structure in mplrs.input_filename
+		 * for mplrs.tfn, to prevent writing to non-existent
+		 * subdirectories in e.g. /tmp
+		 */
+		i = strlen(mplrs.tfn_prefix) + 6 + strlen(tim1)+1;
+		j = strlen(mplrs.tfn);
+		for (; i<j; i++)
+		{
+			c = mplrs.tfn[i];
+			if (c == '/' || c == '\\')
+				mplrs.tfn[i] = '_';
+		}
+		
+	}
+
+	/* setup signals --
+         * TERM checkpoint to checkpoint file, or output file if none & exit
+	 * HUP ditto
+         */
+	signal(SIGTERM, mplrs_caughtsig);
+	signal(SIGHUP, mplrs_caughtsig);
+}
+
+/* if we catch a signal, set a flag to exit */
+void mplrs_caughtsig(int sig)
+{
+	if (mplrs.caughtsig<2)
+		mplrs.caughtsig = 1;
+	signal(sig, mplrs_caughtsig);
+	return;
+}
+
+/* if we've caught a signal, tell the master about it */
+void mplrs_handlesigs(void)
+{
+	float junk = 0;
+	if (mplrs.caughtsig != 1)
+		return;
+	/* we want to stop, so no need to hurry and queue the send */
+	MPI_Send(&junk, 1, MPI_FLOAT, MASTER, 9, MPI_COMM_WORLD);
+	mplrs.caughtsig = 2; /* handled */
+	return;
+}
+
+/* send the contents of the input file to all workers */
+void master_sendfile(void)
+{
+	char *buf;
+	int count=0;
+	int c, i;
+
+	c = fgetc(master.input);
+
+	while (c!=EOF)
+	{
+		count++;
+		c = fgetc(master.input);
+	}
+
+	buf = (char *)malloc(sizeof(char)*(count));
+
+	fseek(master.input, 0, SEEK_SET);
+	
+	for (i=0; i<count; i++)
+	{
+		c = fgetc(master.input);
+		buf[i]=c;
+	}
+
+	for (i=0; i<mplrs.size; i++)
+	{
+		if (i==MASTER || i==CONSUMER)
+			continue;
+		mprintf2(("M: Sending input file to %d\n", i));
+		MPI_Send(&count, 1, MPI_INT, i, 20, MPI_COMM_WORLD);
+		MPI_Send(buf, count, MPI_CHAR, i, 20, MPI_COMM_WORLD);
+	}
+	fseek(master.input, 0, SEEK_SET);
+	free(buf);
+}
+
+/* initialize default values of global structures, before commandline */
+void mplrs_initstrucs(void)
+{
+	mplrs.cobasis_list = NULL;
+
+	mplrs.caughtsig = 0;
+	mplrs.rays = 0;
+	mplrs.vertices = 0;
+	mplrs.bases = 0;
+	mplrs.facets = 0;
+	mplrs.intvertices = 0;
+
+	mplrs.my_tag = 100;
+	mplrs.tfn_prefix = DEF_TEMP;
+	mplrs.tfn = NULL;
+	mplrs.tfile = NULL;
+	mplrs.input_filename = DEF_INPUT;
+	mplrs.input = NULL;
+	mplrs.output_list = NULL;
+	mplrs.outnum = 0;
+	mplrs.maxbuf = DEF_MAXBUF; /* maximum # lines to buffer */
+	mplrs.countonly = 0;
+
+	master.cobasis_list = NULL;
+	master.size_L = 0;
+	master.tot_L = 0;
+	master.num_empty = 0;
+	master.num_producers = 0;
+	master.checkpointing = 0;
+	master.lmin = DEF_LMIN;
+	master.lmax = DEF_LMAX;
+	master.scalec = DEF_SCALEC;
+	master.initdepth = DEF_ID;
+	master.maxdepth = DEF_MAXD;
+	master.maxcobases = DEF_MAXC;
+	master.maxncob = DEF_MAXNCOB;
+	master.time_limit = 0;
+	master.hist_filename = DEF_HIST;
+	master.hist = NULL;
+	master.doing_histogram = 0;
+	master.freq_filename = DEF_FREQ;
+	master.freq = NULL;
+	master.restart_filename = DEF_RESTART;
+	master.restart = NULL;
+	master.checkp_filename = DEF_CHECKP;
+	master.checkp = NULL;
+	master.stop_filename = DEF_STOP;
+	master.stop = NULL;
+	master.input = NULL;
+
+	consumer.prodreq = NULL;
+	consumer.prodibf = NULL;
+	consumer.incoming = NULL;
+	consumer.output_filename = DEF_OUTPUT;
+	consumer.output = stdout;
+	consumer.num_producers = 0;
+	consumer.checkpoint = 0;
+	consumer.waiting_initial = 1;
+}
+
+/* process commandline arguments */
+void mplrs_commandline(int argc, char **argv)
+{
+	int i, arg, firstfile=1;
+	for (i=1; i<argc; i++)
+	{
+		if (!strcmp(argv[i], "-lmin"))
+		{
+			arg = atoi(argv[i+1]);
+			i++;
+			if (arg < 1  )
+				bad_args();
+			master.lmin = arg;
+                        if (master.lmin > master.lmax)
+                           master.lmax = arg;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-lmax"))
+		{
+			arg = atoi(argv[i+1]);
+			i++;
+			if (arg<1 )
+				bad_args();
+			master.lmax = arg;
+                        if (master.lmin > master.lmax)
+                           master.lmin = arg;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-scale"))
+		{
+			arg = atoi(argv[i+1]);
+			i++;
+			if (arg<1)
+				bad_args();
+			master.scalec = arg;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-hist"))
+		{
+			master.hist_filename = argv[i+1];
+			i++;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-freq"))
+		{
+			master.freq_filename = argv[i+1];
+			i++;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-stopafter"))
+		{
+			arg = atoi(argv[i+1]);
+			i++;
+			if (arg<1)
+				bad_args();
+			master.maxncob = arg;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-countonly"))
+		{
+			mplrs.countonly = 1;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-maxbuf"))
+		{
+			arg = atoi(argv[i+1]);
+			i++;
+			if (arg<1)	
+				bad_args();
+			mplrs.maxbuf = arg;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-id"))
+		{
+			arg = atoi(argv[i+1]);
+			i++;
+			if (arg<0)
+				bad_args();
+			master.initdepth = arg;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-maxd"))
+		{
+			arg = atoi(argv[i+1]);
+			i++;
+			if (arg<1)
+				bad_args();
+			master.maxdepth = arg;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-maxc"))
+		{
+			arg = atoi(argv[i+1]);
+			i++;
+			if (arg<0)
+				bad_args();
+			master.maxcobases = arg;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-checkp"))
+		{
+			master.checkp_filename = argv[i+1];
+			i++;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-stop"))
+		{
+			master.stop_filename = argv[i+1];
+			i++;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-time"))
+		{
+			arg = atoi(argv[i+1]);
+			i++;
+			if (arg<1)
+				bad_args();
+			master.time_limit = arg;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-restart"))
+		{
+			master.restart_filename = argv[i+1];
+			i++;
+			continue;
+		}
+		else if (!strcmp(argv[i], "-temp"))
+		{
+			mplrs.tfn_prefix = argv[i+1];
+			i++;
+			continue;
+		}
+		else if (firstfile == 1)
+		{
+			mplrs.input_filename = argv[i];
+			firstfile = 2;
+		}
+		else if (firstfile == 2)
+		{
+			consumer.output_filename = argv[i];
+			firstfile = 3;
+		}
+		else
+			bad_args();
+	}
+	if (master.lmax == -1)
+		master.lmax = (master.lmin>0? master.lmin: 0);
+	if (mplrs.input_filename==NULL) /* need an input file */
+		bad_args();
+	if ((master.stop_filename!=NULL || master.time_limit!=0)  && 
+	    master.checkp_filename==NULL)
+		bad_args(); /* need checkpoint file if stop condition given */
+}
+
+/* open input file on master, histogram (if exists) on master, 
+ * output (if exists) on consumer
+ */
+void mplrs_initfiles(void)
+{
+	if (mplrs.rank == MASTER)
+	{
+		master.input = fopen(mplrs.input_filename, "r");
+		if (master.input == NULL)
+		{
+			printf("Unable to open %s for reading [%s].\n",
+				mplrs.input_filename, mplrs.host);
+			/* MPI_Finalize(); */
+			exit(0);
+		}
+		if (master.hist_filename != NULL)
+		{
+			master.hist = fopen(master.hist_filename, "w");
+			if (master.hist == NULL)
+			{
+				printf("Unable to open %s for writing [%s].\n",
+				       master.hist_filename, mplrs.host);
+				/* MPI_Finalize(); */
+				exit(0);
+			}
+			master.doing_histogram = 1;
+			mprintf2(("M: Prepared histogram (%s)\n", master.hist_filename));
+		}
+		if (master.freq_filename != NULL)
+		{
+			master.freq = fopen(master.freq_filename, "w");
+			if (master.freq == NULL)
+			{
+				printf("Unable to open %s for writing [%s].\n",
+					master.freq_filename, mplrs.host);
+				exit(0);
+			}
+			mprintf2(("M: Prepared frequency file (%s)\n",
+				 master.freq_filename));
+		}
+		if (master.checkp_filename !=NULL)
+		{
+			master.checkp = fopen(master.checkp_filename, "w");
+			if (master.checkp == NULL)
+			{
+				printf("Unable to open %s for writing [%s].\n",
+					master.checkp_filename, mplrs.host);
+				/* MPI_Finalize(); */
+				exit(0);
+			}
+			mprintf2(("M: Prepared checkpoint file (%s)\n",
+				  master.checkp_filename));
+		}
+		if (master.restart_filename != NULL)
+		{
+			master.restart = fopen(master.restart_filename, "r");
+			if (master.restart == NULL)
+			{
+				printf("Unable to open %s for reading [%s].\n",
+				       master.restart_filename, mplrs.host);
+				/* MPI_Finalize(); */
+				exit(0);
+			}
+			mprintf2(("M: Opened restart file (%s)\n",
+				  master.restart_filename));
+		}
+	}
+	if (mplrs.rank == CONSUMER)
+	{
+		if (consumer.output_filename == NULL)
+			return;
+		consumer.output = fopen(consumer.output_filename, "w");
+		if (consumer.output == NULL)
+		{
+			printf("Unable to open %s for writing [%s].\n",
+			       consumer.output_filename, mplrs.host);
+			/* MPI_Finalize(); */
+			exit(0);
+		}
+	}
+}
+
+
+/* Bad commandline arguments. Complain and die. */
+void bad_args(void)
+{
+	if (mplrs.rank == CONSUMER)
+		printf("Invalid arguments.\n%s\n", USAGE);
+	MPI_Finalize();
+	exit(0);
+}
+
+/* fallback -- meaningless if <3 processes
+ * better would be to fallback to normal lrs if <4 processes
+ */
+int mplrs_fallback(void)
+{
+	if (mplrs.rank==0)
+		printf("mplrs requires at least 3 processes.\n");
+	MPI_Finalize();
+	exit(0);
+}
+
+/**********		
+ * master *
+ **********/
+
+int mplrs_master(void)
+{
+	int i;
+	int loopiter = 0;  /* do some things only sometimes */
+	MPI_Request ign = MPI_REQUEST_NULL;   /* a request we'll ignore */
+	timeval cur, last; /* for histograms */
+	int flag = 0;;
+	int phase = 1;     /* In phase1? */
+	int done = -1;     /* need a negative int to send to finished workers */
+	float junk=0; 	   /* need a buffer for incoming signal reports */
+	int ncob=0; /* for printing sizes of sub-problems and for -stopafter*/
+	unsigned long tot_ncob = 0;
+	int want_stop = 0;
+
+	gettimeofday(&last, NULL);
+
+	master.num_producers = 0; /* nobody working right now */
+	master.act_producers = (unsigned int *)malloc(sizeof(unsigned int)*mplrs.size);
+	master.live_workers = mplrs.size - 2;
+	master.workin = (int *)malloc(sizeof(int)*mplrs.size);
+	master.mworkers = (MPI_Request *)malloc(sizeof(MPI_Request)*mplrs.size);
+	master.incoming = NULL;
+	master.sigcheck = (MPI_Request *)malloc(sizeof(MPI_Request)*mplrs.size);
+
+	if (master.restart!=NULL)
+	{
+		master_restart();
+		if (master.size_L>0)
+			phase = 0;
+	}
+
+	for (i=0; i<mplrs.size; i++)
+	{
+		master.act_producers[i] = 0;
+		if (i==MASTER)
+			continue;
+		MPI_Irecv(&junk, 1, MPI_FLOAT, i, 9, MPI_COMM_WORLD,
+			  master.sigcheck+i);
+		if (i==CONSUMER)
+			continue;
+		MPI_Irecv(master.workin+i, 1, MPI_UNSIGNED, i, 6, MPI_COMM_WORLD,
+			  master.mworkers+i);
+	}
+
+	while ((master.cobasis_list!=NULL && !master.checkpointing && !want_stop) || master.num_producers>0 || master.live_workers>0)
+	{
+		loopiter++;
+		/* sometimes check if we should update histogram etc */
+		if (!(loopiter&0x1ff))
+		{
+			if (master.maxncob>0 && master.maxncob<=tot_ncob)
+				want_stop = 1;
+			if (master.doing_histogram)
+				print_histogram(&cur, &last);
+		}
+		/* sometimes check if we should checkpoint */
+		if (!(loopiter&0x7ff) && !master.checkpointing)
+		{
+			if (master.stop_filename!=NULL || master.time_limit!=0)
+				check_stop();
+			master_checksigs();
+		}
+			
+		recv_producer_lists();
+
+		/* check if anyone wants work */
+		for (i=0; i<mplrs.size; i++)
+		{	/* consumer and master don't want work */
+			if (i==CONSUMER || i==MASTER)
+				continue;
+			/* workers that have exited can't work */
+			if (master.mworkers[i]==MPI_REQUEST_NULL)
+				continue;
+			if (master.num_producers>0 && master.cobasis_list==NULL)
+			{
+				break; /* no work to give now, but some may
+					* appear later
+					*/
+			}
+
+			if (phase==1 && i!=INITIAL)
+				continue; /* INITIAL gets the first bit */
+			MPI_Test(master.mworkers+i, &flag, MPI_STATUS_IGNORE);
+			if (!flag)
+				continue; /* i is not ready for more work */
+			
+			ncob = master.workin[i];
+			tot_ncob+=ncob;
+			mprintf2(("M: %d looking for work\n", i));
+			if ((master.cobasis_list!=NULL || phase==1) && 
+			    !master.checkpointing && !want_stop)
+			{ /* and not checkpointing! */
+				send_work(i,phase);
+				MPI_Irecv(master.workin+i, 1, MPI_UNSIGNED,i, 6,
+					  MPI_COMM_WORLD, master.mworkers+i);
+				phase=0;
+				if (master.freq!=NULL && ncob>0)
+					fprintf(master.freq, "%d\n", ncob);
+				continue;
+			}
+			/* else tell worker we've finished */
+			mprintf(("M: Saying goodbye to %d, %d left\n", i,
+				 master.live_workers-1));
+			MPI_Isend(&done, 1, MPI_INT, i, 8, MPI_COMM_WORLD,&ign);
+			MPI_Request_free(&ign);
+			master.live_workers--;
+			if (master.freq!=NULL && ncob>0)
+				fprintf(master.freq, "%d\n", ncob);
+		}
+
+		clean_outgoing_buffers();
+	}
+
+	if (master.checkpointing)
+		master_checkpoint();
+
+	send_master_stats();
+	MPI_Finalize();
+	free(master.workin);
+	free(master.mworkers);
+	free(master.act_producers);
+	free(master.sigcheck);
+	return 0;
+}
+
+/* prepare to receive remaining cobases from target.
+ * Since we don't yet know the size of buffers needed, we
+ * only Irecv the header and will Irecv the actual cobases later
+ */
+void master_add_incoming(int target)
+{
+	msgbuf *msg = (msgbuf *)malloc(sizeof(msgbuf));
+	msg->req = (MPI_Request *)malloc(sizeof(MPI_Request)*3);
+	msg->buf = (void **)malloc(sizeof(void *)*3);
+	msg->buf[0] = (int *)malloc(sizeof(int) * 3); /* (strlen,lengths,tag) */
+	msg->buf[1] = NULL; /* sizes not known yet */
+	msg->buf[2] = NULL;
+	msg->count = 3;
+	msg->target = target;
+	msg->queue = 1;
+	msg->tags = NULL;
+	msg->sizes = NULL;
+	msg->types = NULL;
+	msg->next = master.incoming;
+	master.incoming = msg;
+	MPI_Irecv(msg->buf[0], 3, MPI_INT, target, 10, MPI_COMM_WORLD,msg->req);	return;
+}
+
+/* check our list of incoming messages from producers about cobases
+ * to add to L.  Add any from messages that have completed.
+ * If the header has completed (msg->queue==1 and header completed),
+ * add the remaining MPI_Irecv's. 
+ * Update num_producers to keep track of how many messages the master
+ * is owed (workers are not allowed to exit until num_producers==0 and
+ * L is empty).
+ * Also update size_L
+ */
+void recv_producer_lists(void)
+{
+	msgbuf *msg, *prev=NULL, *next;
+	int *header;
+	int flag;
+
+	for (msg = master.incoming; msg; msg=next)
+	{
+		next = msg->next;
+		MPI_Test(msg->req, &flag, MPI_STATUS_IGNORE);
+		if (!flag) /* header has not completed yet */
+		{
+			prev = msg;
+			continue;
+		}
+		header = (int *)msg->buf[0];
+		if (msg->queue) /* header completed, and need to Irecv now */
+		{
+			if (header[0]==-1) /* producer returns NOTHING */
+			{
+				master.num_producers--;
+				master.act_producers[msg->target]--;
+				free_msgbuf(msg);
+				if (prev)
+					prev->next = next;
+				else
+					master.incoming = next;
+				continue;
+			}
+			msg->buf[1]= (char*)malloc(sizeof(char)*header[1]);
+			msg->buf[2]= (int *)malloc(sizeof(int)*header[0]);
+			MPI_Irecv(msg->buf[1], header[1], MPI_CHAR, msg->target,
+				  header[2], MPI_COMM_WORLD, msg->req+1);
+			MPI_Irecv(msg->buf[2], header[0], MPI_INT, msg->target,
+				  header[2], MPI_COMM_WORLD, msg->req+2);
+			msg->queue=0;
+			prev = msg;
+			continue;
+		}
+		/* header completed, did the rest? */
+		MPI_Testall(2, msg->req+1, &flag, MPI_STATUSES_IGNORE);
+		if (!flag) /* not yet */
+		{
+			prev = msg;
+			continue;
+		}
+
+		mprintf2(("M: %d returned non-empty producer list (%d, %d)\n",
+			  msg->target, header[0], header[1]));
+		process_returned_cobases(msg);
+
+		mprintf2(("M: Now have size_L=%lu\n",master.size_L));
+
+		if (prev)
+			prev->next = next;
+		else
+			master.incoming = next;
+		master.num_producers--;
+		master.act_producers[msg->target]--;
+		free_msgbuf(msg);
+	}
+	return;
+}
+
+/* msg is a completed, non-empty buffer containing cobases to add to
+ * L.  Process it, add them to L, and update size_L
+ * Basically the inverse of return_unfinished_cobases()
+ */
+void process_returned_cobases(msgbuf *msg)
+{
+	int *header = (int *)msg->buf[0];
+	char *str = (char *)msg->buf[1];
+	int *lengths = (int *)msg->buf[2];
+	int i;
+	char *cob;
+
+	for (i=0; i<header[0]; i++)
+	{
+		cob = (char *)malloc(sizeof(char)*(lengths[i]+1));
+		strncpy(cob, str, lengths[i]);
+		cob[lengths[i]] = '\0';
+		str+=lengths[i];
+		mprintf2(("M: Adding to L: %s\n",cob));
+		master.cobasis_list = addlist(master.cobasis_list, cob);
+	}
+	master.size_L += header[0];
+	master.tot_L += header[0];
+
+	return;
+}
+	
+/* send one unit from L to target.  if phase!=0, this is the first
+ * unit we're sending (i.e. phase 1).  usually, phase==0.
+ * Parameters are scaled and sent in the header.
+ */
+void send_work(int target, int phase)
+{
+	slist *cob;
+	msgbuf *msg = (msgbuf *)malloc(sizeof(msgbuf));
+	int *header;
+	msg->req = (MPI_Request *)malloc(sizeof(MPI_Request)*2);
+	msg->buf = (void **)malloc(sizeof(void *)*2);
+	/*{length of work string, int maxdepth, int maxcobases, 5xfuture use} */
+	msg->buf[0] = (int *)malloc(sizeof(int) * 8);
+	header = (int *)msg->buf[0];
+
+	if (phase==0)	/* normal */
+	{
+		cob = master.cobasis_list;
+		master.cobasis_list = cob->next;
+		header[0] = strlen((char *)cob->data);
+		msg->buf[1] = cob->data;
+		setparams(header); /* scale if needed */
+		master.size_L--;
+		if (master.size_L == 0)
+			master.num_empty++;
+		mprintf(("M: Sending work to %d (%d,%d,%d) %s\n",
+			target, header[0], header[1], header[2], 
+			(char*)msg->buf[1]));
+		msg->count = 2;
+		free(cob);
+	}
+	else		/* phase 1 */
+	{
+		header[0] = 0; /* header[0]==0 means initial phase 1 */
+		header[1] = master.initdepth;
+		header[2] = master.maxcobases;
+		mprintf(("M: Sending phase 1 to %d (%d,%d)\n", target,
+			 header[1], header[2]));
+		msg->buf[1] = NULL;
+		msg->count = 1;
+	}
+	msg->target = target;
+	msg->queue = 0;
+	msg->tags = NULL;
+	msg->sizes = NULL;
+	msg->types = NULL;
+
+	/* ready to send */
+	MPI_Isend(header, 8, MPI_INT, target, 1, MPI_COMM_WORLD, msg->req);
+	if (phase==0)
+		MPI_Isend(msg->buf[1], header[0], MPI_CHAR, target, 1,
+			  MPI_COMM_WORLD, msg->req+1);
+	master_add_incoming(target); /* prepare to receive remaining cobases */
+
+	msg->next = mplrs.outgoing;
+	mplrs.outgoing = msg;
+
+	master.act_producers[target]++;
+	master.num_producers++;
+
+	return;
+}
+
+/* header is a work header (length, maxd, maxc) not yet set. 
+ * Set the parameters (maxd, maxc) as desired.
+ */
+void setparams(int *header)
+{
+	/* if L is too small, use maxdepth */
+	if (master.lmin>0 && (master.size_L < mplrs.size*master.lmin))
+		header[1] = master.maxdepth;
+	else /* don't have too small L, so no maxdepth */
+		header[1] = 0;
+	header[2] = master.maxcobases;
+	if (master.lmax>0 && (master.size_L > mplrs.size * master.lmax))
+		header[2] = header[2] * master.scalec;
+}
+
+/* check if we want to stop now.
+ * if master.stop_filename exists or time limit exceeded, 
+ * set master.checkpointing = 1 and inform consumer.
+ * this is not an immediate stop -- we wait for current workers to
+ * complete the tasks they've been assigned, stopping after that.
+ */
+void check_stop(void)
+{
+	int check[3] = {CHECKFLAG,0,0};
+	MPI_Request ign;
+	struct timeval cur;
+	int flag = 0;
+
+	if (master.stop_filename)
+	{
+		mprintf2(("M: checking stop file %s\n", master.stop_filename));
+		master.stop = fopen(master.stop_filename, "r");
+		if (master.stop!=NULL)
+		{
+			flag=1;
+			fclose(master.stop);
+		}
+	}
+
+	if (master.time_limit!=0)
+	{
+		mprintf2(("M: checking if time exceeded\n"));
+		gettimeofday(&cur, NULL);
+		if (cur.tv_sec - mplrs.start.tv_sec > master.time_limit)
+			flag=1;
+	}
+	if (flag!=0)
+	{
+		mprintf(("M: Stop condition detected, checkpointing!\n"));
+		master.checkpointing = 1;
+		MPI_Isend(check, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD, &ign);
+		MPI_Request_free(&ign);
+	}
+}
+
+/* check if we've caught a signal, or we've received a message from someone
+ * that has.  If so, we want to checkpoint like above
+ */
+void master_checksigs(void)
+{
+	int i, flag, size=mplrs.size;
+	int check[3] = {CHECKFLAG,0,0};
+	MPI_Request ign;
+	if (mplrs.caughtsig == 1)
+	{
+		mprintf(("M: I caught signal, checkpointing!\n"));
+		MPI_Isend(check, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD, &ign);
+		master.checkpointing = 1;
+		return;
+	}
+	for (i=1; i<size; i++)
+	{
+		MPI_Test(master.sigcheck+i, &flag, MPI_STATUS_IGNORE);
+		if (flag)
+		{
+			mprintf(("M: %d caught signal, checkpointing!\n",i));
+			MPI_Isend(check, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD, &ign);
+			master.checkpointing = 1;
+			return;
+		}
+	}
+}
+
+/* we're checkpointing, receive stats from consumer and output checkpoint file
+ */
+/* two cases: we have a file to checkpoint to (normal)
+ * or, we caught a signal and want to checkpoint but have no checkpoint file
+ * in this case we append to the output file via the consumer, with
+ * comments on where to cut
+ */
+/* mplrs1 format: counts are 'unsigned int' (%d unfortunately)
+ * mplrs2 format: counts are 'unsigned long' (%lu)
+ * so mplrs1 files can be read by mplrs2 readers not necc. reverse
+ */
+void master_checkpoint(void)
+{
+	int fin = -1;
+	mprintf(("M: making checkpoint file\n"));
+	recv_counting_stats(CONSUMER);
+	if (master.checkp_filename != NULL)
+	{
+		MPI_Send(&fin, 1, MPI_INT, CONSUMER, 1, MPI_COMM_WORLD);
+		master_checkpointfile();
+		return;
+	}
+	else
+	{
+		fin = 1;
+		MPI_Send(&fin, 1, MPI_INT, CONSUMER, 1, MPI_COMM_WORLD);
+		master_checkpointconsumer();
+		return;
+	}
+}
+
+/* note: master_checkpointconsumer and master_checkpointfile should
+ * produce the *same* format. First two lines of checkpointconsumer
+ * done by the consumer directly for now.
+ */
+void master_checkpointconsumer(void)
+{
+	int len;
+	char *str;
+	slist *list, *next;
+	for (list=master.cobasis_list; list; list=next)
+	{
+		next = list->next;
+		str = (char*)list->data;
+		len = strlen(str)+1; /* include \0 */
+		MPI_Send(&len, 1, MPI_INT, CONSUMER, 1, MPI_COMM_WORLD);
+		MPI_Send(str, len, MPI_CHAR, CONSUMER, 1, MPI_COMM_WORLD);
+		free(str);
+		free(list);
+	}
+	len = -1;
+	MPI_Send(&len, 1, MPI_INT, CONSUMER, 1, MPI_COMM_WORLD);
+}
+void master_checkpointfile(void)
+{
+	slist *list, *next;
+	char *vol = cprat("", mplrs.Vnum, mplrs.Vden);
+	fprintf(master.checkp, "mplrs3\n%lu %lu %lu %lu %lu\n%s\n", 
+		mplrs.rays, mplrs.vertices, mplrs.bases, mplrs.facets,
+		mplrs.intvertices,vol);
+	free(vol);
+	for (list=master.cobasis_list; list; list=next)
+	{
+		next = list->next;
+		fprintf(master.checkp, "%s\n", (char *)list->data);
+		free(list->data);
+		free(list);
+		master.size_L--;
+	}
+	fclose(master.checkp);
+	return;
+}
+
+/* we want to restart. load L and counting stats from restart file,
+ * send counting stats to consumer (after notifying consumer of restart)
+ */
+void master_restart(void)
+{
+	char *line=NULL;
+	char *vol=NULL;
+	size_t size=0, vsize=0;
+	ssize_t len=0;
+	int restart[3] = {RESTARTFLAG,0,0};
+	int ver;
+
+	/* check 'mplrs1' header */
+	len = getline(&line, &size, master.restart);
+	if (len!=7 || (strcmp("mplrs1\n",line) && strcmp("mplrs2\n",line) && 
+		       strcmp("mplrs3\n",line)))
+	{
+		printf("Unknown checkpoint format\n");
+		/* MPI_Finalize(); */
+		exit(0);
+	}
+
+	mprintf2(("M: found checkpoint header\n"));
+	sscanf(line,"mplrs%d\n",&ver);
+
+	/* get counting stats */
+	fscanf(master.restart, "%lu %lu %lu %lu %lu\n",
+	       &mplrs.rays, &mplrs.vertices, &mplrs.bases, &mplrs.facets,
+	       &mplrs.intvertices);
+	if (ver<3) /* volume added in mplrs3 */
+		printf("*Old checkpoint file, volume may be incorrect\n");
+	else /* get volume */
+	{
+		len = getline(&vol, &vsize, master.restart);
+		if (len<=1)
+		{
+			printf("Broken checkpoint file\n");
+			exit(0);
+		}
+		vol[len-1] = '\0'; /* remove '\n' */
+		plrs_readrat(mplrs.Tnum, mplrs.Tden, vol);
+		copy(mplrs.tN, mplrs.Vnum); copy(mplrs.tD, mplrs.Vden);
+		linrat(mplrs.tN, mplrs.tD, 1L, mplrs.Tnum, mplrs.Tden,
+		       1L, mplrs.Vnum, mplrs.Vden);
+		free(vol);
+	}
+		
+	/* get L */
+	while((len = getline(&line, &size, master.restart))!= -1)
+	{
+		if (line[0]=='\n') /* ignore blank lines */
+		{
+			free(line);
+			line = NULL;
+			size=0;
+			continue;
+		}
+		line[strlen(line)-1]='\0'; /* replace \n by \0 */
+		master.cobasis_list = addlist(master.cobasis_list, line);
+		master.size_L++;
+		line = NULL;
+		size = 0;
+	}
+	master.tot_L = master.size_L; /* maybe should save and retrieve */
+	mprintf(("M: Restarted with |L|=%lu\n",master.size_L)); 
+	fclose(master.restart);
+
+	MPI_Send(restart, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD);
+	send_counting_stats(CONSUMER);
+	mplrs.rays = 0;
+	mplrs.vertices = 0;
+	mplrs.bases = 0;
+	mplrs.facets = 0;
+	mplrs.intvertices = 0;
+
+	return;
+}
+
+/* check if we should update the histogram and then do it */
+void print_histogram(timeval *cur, timeval *last)
+{
+	float sec;
+	int i;
+	int act;
+
+	gettimeofday(cur,NULL);
+	if (cur->tv_sec > last->tv_sec)
+	{
+		sec = (float)(cur->tv_sec - mplrs.start.tv_sec) + ((float)(cur->tv_usec - mplrs.start.tv_usec))/1000000;
+		act=0;
+		for (i=0; i<mplrs.size; i++)
+			if (master.act_producers[i])
+				act++;
+		/* time (seconds), number of active producers, number of
+		 * producers owing us a message about remaining cobases,
+		 * 0, 0 (existed in mplrs.cpp but no longer)
+		 */
+		fprintf(master.hist, "%f %d %lu %d %d %d %lu\n",
+			sec, act, master.size_L, master.num_producers, 0, 0, master.tot_L);
+		fflush(master.hist);
+		last->tv_sec = cur->tv_sec;
+		last->tv_usec = cur->tv_usec;
+	}
+}
+
+/**********
+ * worker *
+ **********/
+
+int mplrs_worker(void)
+{
+	char *starting_cobasis;
+	/* header for incoming work:
+	 * {length of work string, int maxdepth, int maxcobases, 5xfuture use}
+	 */ 
+	int header[8]={0,0,0,0,0,0,0,0};
+	MPI_Request req = MPI_REQUEST_NULL;
+	unsigned int ncob=0; /* used for # cobases in prev. job */
+	unsigned int tot_ncob=0; 
+	int len;
+	int flag;
+
+	while (1)
+	{
+		ncob = mplrs.bases - tot_ncob; /* #cobases in last job */
+		tot_ncob = mplrs.bases; /* #cobases done so far */
+		/* check signals */
+		mplrs_handlesigs();
+		/* ask for work */
+		mprintf2(("%d: Asking master for work\n",mplrs.rank));
+		MPI_Isend(&ncob, 1, MPI_UNSIGNED, MASTER, 6, MPI_COMM_WORLD, 
+			  &req);
+		flag = 0;
+		while (1) /* was MPI_Wait(&req, MPI_STATUS_IGNORE); */
+		{
+			MPI_Test(&req, &flag, MPI_STATUS_IGNORE);
+			if (flag)
+				break;
+			clean_outgoing_buffers();
+		}
+
+		starting_cobasis = NULL;
+		/* get response */
+		MPI_Recv(header, 8, MPI_INT, MASTER, MPI_ANY_TAG,
+			 	 MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+		mprintf2(("%d: Message received from master\n",mplrs.rank));
+
+		len = header[0];	
+
+		if (len==-1) /* no more work to do */
+			return mplrs_worker_finished();
+
+		if (len>0)
+		{
+			starting_cobasis = (char*)malloc(sizeof(char)*(len+1));
+			MPI_Recv(starting_cobasis, len, MPI_CHAR, MASTER,
+			 	 MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
+			starting_cobasis[len] = '\0';
+		}
+
+		/* do work */
+		do_work(header, starting_cobasis);
+		free(starting_cobasis);
+
+		/* send output and unfinished cobases */
+		process_output();
+		return_unfinished_cobases();
+
+		clean_outgoing_buffers(); /* check buffered sends, 
+					   * free if finished
+					   */
+	}
+	return 0; /* unreachable */
+}
+
+/* This worker has finished.  Tell the consumer, send counting stats
+ * and exit.
+ */
+int mplrs_worker_finished(void)
+{
+	int done[3] = {-1,-1,-1};
+
+	mprintf((" %d: All finished! Informing consumer.\n",mplrs.rank));
+
+	while (mplrs.outgoing) /* needed? negligible in any case */
+	{
+		clean_outgoing_buffers();
+	}
+	MPI_Send(&done, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD);
+	send_counting_stats(CONSUMER);
+	MPI_Finalize();
+	return 0;
+}	
+
+/* Go through our outgoing MPI_Isends, free anything that has completed.
+ * Also, if any of these are queued and the header has completed, then
+ * send the remaining data.
+ * Don't use with incoming buffers.
+ */
+void clean_outgoing_buffers(void)
+{
+	msgbuf *msg, *next, *prev=NULL;
+	
+	for (msg = mplrs.outgoing; msg; msg=next)
+	{
+		next = msg->next;
+		if (!outgoing_msgbuf_completed(msg))
+		{
+			prev = msg;
+			continue;
+		}
+		if (prev)
+			prev->next = next;
+		else
+			mplrs.outgoing = next;
+		free_msgbuf(msg);
+	}
+}
+
+/* header[1] gives maxdepth, header[2] gives maxcobases,
+ * if header[0] > 0, 
+ *     starting_cobasis gives the starting cobasis.
+ * if header[0] == 0, starting at initial input (phase 1)
+ */
+void do_work(const int *header, const char *starting_cobasis)
+{
+	char *argv[] = {mplrs.tfn};
+	/* prepare input file */
+	mplrs.initializing = 0;
+	mplrs.tfile = fopen(mplrs.tfn, "w");
+	fprintf(mplrs.tfile, "%s", mplrs.input);
+
+	mprintf3(("%d: Received work (%d,%d,%d)\n",mplrs.rank,header[0],
+						   header[1],header[2]));
+	if (header[0]>0)
+		fprintf(mplrs.tfile, "\nmindepth 0\nrestart %s\n",
+				     starting_cobasis);
+	else
+		mplrs.initializing = 1; /* phase 1 output handled different */
+
+	if (header[1]>0)
+		fprintf(mplrs.tfile, "\nmaxdepth %d\n", header[1]);
+	if (header[2]>0)
+		fprintf(mplrs.tfile, "\nmaxcobases %d\n", header[2]);
+
+	if (mplrs.countonly == 1)
+		fprintf(mplrs.tfile, "countonly\n");
+
+	fclose(mplrs.tfile);
+
+	mprintf2(("%d: Calling lrs_main\n",mplrs.rank));
+	lrs_main(1, argv);
+	mprintf2(("%d: lrs_main returned\n",mplrs.rank)); 
+	if (remove(mplrs.tfn) != 0) /* UNsynchronized printf -- should fix */
+		printf("Error deleting thread file!\n");
+}
+
+/* The worker has finished its work.  Process the output, preparing
+ * and sending the output to the consumer, and preparing the unfinished
+ * cobases for return_unfinished_cobases().
+ */
+void process_output(void)
+{
+	outlist *out = mplrs.output_list, *next;
+	char *out_string=NULL; /* for output file if exists */
+	const char *type; /* because plrs_output is C++ at the moment */
+	const char *data; /* because plrs_output is C++ at the moment */
+	int len = 1024;
+
+	mplrs.outnum = 0; /* clearing buffer */
+	mplrs.output_list = NULL;
+
+	out_string = (char *)malloc(sizeof(char)*len);
+	out_string[0]='\0';
+
+	/* reverse when initializing to get correct order */
+	if (mplrs.initializing)
+		out = reverse_list(out);
+
+	while (out)
+	{
+		type = out->type;
+		data = out->data;
+		if (!strcmp(type, "vertex"))
+			out_string = append_out(out_string, &len, data);
+		else if (!strcmp(type, "ray"))
+			out_string = append_out(out_string, &len, data);
+		else if (!strcmp(type, "cobasis"))
+			process_cobasis(data);
+		else if (!strcmp(type, "V cobasis"))
+			out_string = append_out(out_string, &len, data);
+		else if (!strcmp(type, "facet count"))
+			mplrs.facets += atoi(data);
+		else if (!strcmp(type, "ray count"))
+			mplrs.rays += atoi(data);
+		else if (!strcmp(type, "basis count"))
+			mplrs.bases += atoi(data);
+		else if (!strcmp(type, "vertex count"))
+			mplrs.vertices += atoi(data);
+		else if (!strcmp(type, "integer vertex count"))
+			mplrs.intvertices += atoi(data);
+		else if (!strcmp(type, "volume"))
+		{
+			plrs_readrat(mplrs.Tnum, mplrs.Tden, data);
+			copy(mplrs.tN, mplrs.Vnum); copy(mplrs.tD, mplrs.Vden);
+			linrat(mplrs.tN, mplrs.tD, 1L, mplrs.Tnum, mplrs.Tden,
+			       1L, mplrs.Vnum, mplrs.Vden);
+		}
+		else if (!strcmp(type, "options warning"))
+		{
+			/* only do warnings once, otherwise repeated */
+			if (mplrs.initializing)
+				out_string = append_out(out_string, &len, data);
+		}
+		else if (!strcmp(type, "header"))
+		{
+			/*only do header if initializing, otherwise repeated*/
+			if (mplrs.initializing)
+				out_string = append_out(out_string, &len, data);
+		}
+		else if (!strcmp(type, "debug"))
+		{
+			out_string = append_out(out_string, &len, data);
+		}
+
+		next = out->next;
+		free(out->type);
+		free(out->data);
+		free(out);
+		out = next;
+	}
+
+	if (strlen(out_string)>0 && strcmp(out_string, "\n"))
+		send_output(1, out_string);
+	else
+		free(out_string);
+}
+
+/* send this string to the consumer to output.
+ * If dest!=0, then it goes to the output file (stdout if no output file).
+ * If dest==0, then it goes to stdout.
+ *
+ * The pointer str is surrendered to send_output and should not be changed
+ * It will be freed once the send is complete.
+ */
+/* str should not be NULL */
+void send_output(int dest, char *str)
+{
+	msgbuf *msg = (msgbuf *)malloc(sizeof(msgbuf));
+	int *header = (int *)malloc(sizeof(int)*3);
+
+	header[0] = dest;
+	header[1] = strlen(str);
+	header[2] = mplrs.my_tag; /* to ensure the dest/str pair
+				   * remains intact even if another
+				   * send happens in between
+				   */
+	msg->req = (MPI_Request *)malloc(sizeof(MPI_Request)*2);
+	msg->buf = (void **)malloc(sizeof(void *)*2);
+	msg->buf[0] = header;
+	msg->buf[1] = str;
+
+	msg->count = 2;
+	msg->target = CONSUMER;
+	msg->queue = 1;
+
+	msg->tags = (int *)malloc(sizeof(int)*2);
+	msg->sizes = (int *)malloc(sizeof(int)*2);
+	msg->types = (MPI_Datatype *)malloc(sizeof(MPI_Datatype)*2);
+
+	msg->types[1] = MPI_CHAR;
+	msg->sizes[1] = header[1]+1;
+
+	msg->tags[1] = mplrs.my_tag;
+
+	mplrs.my_tag++;
+
+	msg->next = mplrs.outgoing;
+	mplrs.outgoing = msg;
+	MPI_Isend(header, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD,
+		  msg->req);
+}
+
+/* called from process_output to handle a 'cobasis',
+ * add to queue to return to master
+ */
+/* awful string-hacking, basically copied from plrs processCobasis() */
+/* First, if first characters are 'F#', it's a hull. otherwise it's not.
+ * Then, remove ignore_chars.
+ * Then, remove everything starting from the first 'I'
+ * Then, copy everything verbatim, except:
+ *    if it's a hull, replace everything between second and third spaces
+ *                    by '0'
+ *    if it's not a hull, replace everything between third and fourth
+ *                    spaces by '0'
+ * (this is resetting the depth to be 0 for a restart)
+ */
+void process_cobasis(const char *newcob)
+{
+	int nlen = strlen(newcob);
+	char *buf = (char *)malloc(sizeof(char) * (nlen+1));
+	int i,j,k;
+	int num_spaces=0; /* we count the number of spaces */
+	char ignore_chars[] = "#VRBh=facetsFvertices/rays";
+	char c;
+	int num_ignore = strlen(ignore_chars);
+	int hull = 0;
+	int replace = 0;
+
+	mprintf3(("%d: process_cobasis( %s )", mplrs.rank, newcob));
+
+	if (nlen>1 && newcob[0]=='F' && newcob[1]=='#')
+		hull = 1;
+
+	for (i=0,j=0; i<=nlen; i++)
+	{
+		c = newcob[i];
+		/* ignore ignore_chars */
+		for (k=0; k<=num_ignore; k++)
+			if (c == ignore_chars[k])
+				break;
+		if (k<=num_ignore)
+			continue;
+
+		if (c=='I')
+			break;
+
+		if (c==' ') /* count spaces to set depth to 0 for restart */
+		{
+			num_spaces++;
+			if ( (num_spaces==2 && hull==1) ||
+			     (num_spaces==3 && hull==0) )
+				replace = 1;
+			else if ( (num_spaces==3 && hull==1) ||
+				  (num_spaces==4 && hull==0) )
+				replace = 0;
+			buf[j++] = c; /* copy spaces */
+			continue;
+		}
+		
+		if (replace == 1)
+			buf[j++]='0';
+		else
+			buf[j++]=c;
+	}
+	buf[j]='\0';
+
+	mprintf3((" produced %s\n",buf));
+	mplrs.cobasis_list = addlist(mplrs.cobasis_list, buf);
+}
+
+inline slist *addlist(slist *list, void *buf)
+{
+	slist *n = (slist *)malloc(sizeof(struct slist));
+	n->data = buf;
+	n->next = list;
+	return n;
+}
+
+/* mplrs.cobasis_list may have things to send to the master.
+ * Send the header, and then the cobases to add to L.
+ */
+void return_unfinished_cobases(void)
+{
+	int listsize;
+	slist *list, *next;
+	int *lengths=NULL;
+	char *cobases=NULL;
+	int size = 0;
+	int i;
+	int start;
+	/* header is (strlen(cobases), length of lengths, mplrs.my_tag) */
+	int *header = (int *)malloc(sizeof(int)*3);
+	msgbuf *msg = (msgbuf *)malloc(sizeof(msgbuf));
+	msg->target = MASTER;
+
+	for (listsize=0, list=mplrs.cobasis_list; list; list=list->next)
+	{
+		listsize++;
+		size += strlen((char *)list->data);
+	}
+
+	if (listsize == 0)
+	{
+		header[0] = -1;
+		header[1] = -1;
+		header[2] = -1;
+		msg->buf = (void **)malloc(sizeof(void *));
+		msg->buf[0] = header;
+		msg->count = 1;
+		msg->req = (MPI_Request *)malloc(sizeof(MPI_Request));
+		msg->queue = 0;
+		msg->tags = NULL;
+		msg->sizes = NULL;
+		msg->types = NULL;
+		MPI_Isend(header, 3, MPI_INT, MASTER, 10, MPI_COMM_WORLD,
+			  msg->req);
+		msg->next = mplrs.outgoing;
+		mplrs.outgoing = msg;
+		return;
+	}
+
+	lengths = (int *)malloc(sizeof(int)*listsize);  /*allows unconcatenate*/
+	cobases = (char *)malloc(sizeof(char)*(size+1));/*concatenated + 1 \0*/
+
+	for (start=0, i=0, list=mplrs.cobasis_list; list; list=next, i++)
+	{
+		next = list->next;
+
+		strcpy(cobases+start, (char *)list->data);
+		lengths[i] = strlen((char *)list->data);
+		start+=lengths[i];
+
+		free(list->data);
+		free(list);
+	}
+	/* final \0 is there */
+
+	header[0] = listsize;
+	header[1] = size+1;
+	header[2] = mplrs.my_tag;
+
+	msg->req = (MPI_Request *)malloc(sizeof(MPI_Request) * 3);
+	msg->buf = (void **)malloc(sizeof(void *) * 3);
+	msg->buf[0] = header;
+	msg->buf[1] = cobases;
+	msg->buf[2] = lengths;
+
+	msg->count = 3;
+	msg->queue = 0;
+	msg->tags = NULL;
+	msg->sizes = NULL;
+	msg->types = NULL;
+
+	mprintf2(("%d: Queued send of %d cobases for L\n",mplrs.rank,listsize));
+	MPI_Isend(header, 3, MPI_INT, MASTER, 10, MPI_COMM_WORLD, msg->req);
+	MPI_Isend(cobases, header[1], MPI_CHAR, MASTER, mplrs.my_tag,
+		  MPI_COMM_WORLD, msg->req+1);
+	MPI_Isend(lengths, listsize, MPI_INT, MASTER, mplrs.my_tag, 
+		  MPI_COMM_WORLD, msg->req+2);
+	mplrs.my_tag++;
+
+	msg->next = mplrs.outgoing;
+	mplrs.outgoing = msg;
+	mplrs.cobasis_list = NULL;
+}
+
+/* dest is a string in a buffer with size *size.
+ * Append src and a newline to the string, realloc()ing as necessary,
+ * returning the new pointer and updating size.
+ */
+char *append_out(char *dest, int *size, const char *src)
+{
+	int len1 = strlen(dest);
+	int len2 = strlen(src);
+	int newsize = *size;
+	char *newp = dest;
+
+	if (len1 + len2 + 2 > *size)
+	{
+		newsize = newsize<<1;
+		while ((newsize < len1+len2+2) && newsize)
+			newsize = newsize<<1;
+
+		if (!newsize)
+			newsize = len1+len2+2;
+
+		newp = (char *)realloc(dest, sizeof(char) * newsize);
+		if (!newp)
+		{
+			newsize = len1+len2+2;
+			newp = (char *)realloc(dest, sizeof(char) * newsize);
+			if (!newp)
+			{
+				printf("%d: Error no memory (%d)\n",mplrs.rank,
+								    newsize);
+				/* MPI_Finalize(); */
+				exit(2);
+			}
+		}
+		*size = newsize;
+	}
+
+	strncat(newp, src, len2);
+	newp[len1+len2]='\n';
+	newp[len1+len2+1]='\0';
+	return newp;
+}
+
+/************
+ * consumer *
+ ************/
+
+int mplrs_consumer(void)
+{
+	int i;
+
+	initial_print(); 	/* print version and other information */
+	/* initialize MPI_Requests and 3*int buffers for incoming messages */
+	consumer.prodreq = (MPI_Request*)malloc(sizeof(MPI_Request)*mplrs.size);
+	consumer.prodibf = (int *)malloc(sizeof(int)*3*mplrs.size);
+	consumer.num_producers = mplrs.size - 2;
+	for (i=0; i<mplrs.size; i++)
+	{
+		if (i==CONSUMER)
+			continue;
+		MPI_Irecv(consumer.prodibf+(i*3), 3, MPI_INT, i, 7,
+			  MPI_COMM_WORLD, consumer.prodreq+i);
+	}
+
+	while (consumer.num_producers>0 || consumer.incoming || 
+	       consumer.waiting_initial)
+	{
+		/* check if someone is trying to send us output */
+		/* if so, queue any incoming messages */
+		consumer_start_incoming();
+
+		/* check for completed message to process */
+		consumer_proc_messages();
+
+		/* check signals */
+		mplrs_handlesigs();
+	}
+
+	mprintf2(("C: getting stats and exiting\n"));
+
+	for (i=0; i<mplrs.size; i++)
+	{
+		if (i==CONSUMER || i==MASTER)
+			continue;
+		recv_counting_stats(i);
+	}
+
+	free(consumer.prodreq);
+	free(consumer.prodibf);
+	consumer.prodreq = NULL; consumer.prodibf = NULL;
+
+	if (consumer.checkpoint) /*not really finished, coordinate with master*/
+		return consumer_checkpoint();
+
+	recv_master_stats(); /* gets stats on size of L, etc */
+	final_print();
+	MPI_Finalize();
+	return 0;
+}
+
+/* check if anyone is trying to send us their output */
+/* if master is trying, probably means we're going to checkpoint */
+/* in any case, queue any incoming messages          */
+void consumer_start_incoming(void)
+{
+	int i;
+	int flag;
+	for (i=0; i<mplrs.size; i++)
+	{
+		/* don't check ourself and workers that have exited */
+		if (i==CONSUMER || consumer.prodreq[i] == MPI_REQUEST_NULL)
+			continue;
+		MPI_Test(consumer.prodreq+i, &flag, MPI_STATUS_IGNORE);
+		if (!flag) /* not incoming, check next */
+			continue;
+		mprintf3(("C: received message from %d\n",i));
+		if (i==MASTER && consumer.prodibf[0]==CHECKFLAG)
+		{	/* start checkpoint */
+			consumer.checkpoint = 1;
+			mprintf(("*Checkpointing\n"));
+			continue;
+		}
+		else if (i==MASTER && consumer.prodibf[0]==RESTARTFLAG)
+		{
+			/* get counting stats for restart */
+			recv_counting_stats(MASTER);
+			consumer.waiting_initial = 0;
+			mprintf(("C: Restarted\n"));
+			/* master may restart and later checkpoint */
+			MPI_Irecv(consumer.prodibf+(i*3), 3, MPI_INT, i, 7,
+                          	  MPI_COMM_WORLD, consumer.prodreq+i);
+			continue;
+		}
+		
+		if (consumer.prodibf[3*i]<=0 && consumer.prodibf[3*i+1]<=0)
+		{
+			/* producer i has finished and will exit */
+			consumer.num_producers--;
+			mprintf(("C: Producer %d reported exiting, %d left.\n",
+				 i, consumer.num_producers));
+			continue;
+		}
+
+		/* otherwise, we have the normal situation -- the producer
+		 * wants to send us some output.
+		 */
+		consumer.incoming =
+			 consumer_queue_incoming(consumer.prodibf+3*i, i);
+		MPI_Irecv(consumer.prodibf+(i*3), 3, MPI_INT, i, 7,
+			  MPI_COMM_WORLD, consumer.prodreq+i);
+	}
+}
+
+/* target has sent us this header. Queue the corresponding receive and
+ * return the new value of consumer.incoming.
+ */
+msgbuf *consumer_queue_incoming(int *header, int target)
+{
+	msgbuf *curhead = consumer.incoming;
+	msgbuf *newmsg = (msgbuf *)malloc(sizeof(msgbuf));
+
+	newmsg->req = (MPI_Request *)malloc(sizeof(MPI_Request)*2);
+	newmsg->buf = (void **)malloc(sizeof(void *));
+	newmsg->buf[0] = (char *)malloc(sizeof(char)*(header[1]+1));
+	newmsg->count = 1;
+	newmsg->target = target;
+	newmsg->next = curhead;
+	newmsg->queue = 0;
+	newmsg->tags = NULL;
+	newmsg->sizes = NULL;
+	newmsg->types = NULL;
+
+	newmsg->data = header[0]; /* bound for stdout or output file */
+
+	/* get my_tag from producer via header[2] to uniquely identify msg */
+	MPI_Irecv(newmsg->buf[0], header[1]+1, MPI_CHAR, target, header[2],
+		  MPI_COMM_WORLD, newmsg->req);
+
+	mprintf3(("C: Receiving from %d (%d,%d,%d)",target,header[0],header[1],
+						    header[2]));
+	return newmsg;
+}
+
+/* check our incoming messages, process and remove anything that
+ * has completed
+ */
+void consumer_proc_messages(void)
+{
+	msgbuf *msg, *prev=NULL, *next;
+	int i,len;
+
+	for (msg=consumer.incoming; msg; msg=next)
+	{
+		next=msg->next;
+		if (outgoing_msgbuf_completed(msg))
+		{
+			if (consumer.waiting_initial &&
+			    msg->target != INITIAL)
+				continue;
+
+			if (msg->data!=0)
+				fprintf(consumer.output, "%s",
+					(char*)msg->buf[0]);
+
+			/* we wait on all other output until we've printed
+			 * the initial output containing ...begin\n
+			 * to ensure pretty output, if this is the begin,
+			 * flip the flag.
+			 */
+			if (consumer.waiting_initial)
+			{
+				len=strlen((char *)msg->buf[0])-5;
+				for (i=0; i<len; i++)
+					if (!strncmp("begin\n",
+						     (char*)msg->buf[0]+i,6))
+					{
+						mprintf3(("C: found begin\n"));
+						phase1_print();
+						consumer.waiting_initial = 0;
+						break;
+					}
+			}
+
+			free_msgbuf(msg);
+			if (prev)
+				prev->next = next;
+			else
+				consumer.incoming = next;
+			continue;
+		}
+		prev=msg;
+	}
+}
+
+/* We are checkpointing instead of a normal exit.
+ * Send counting stats to master, then exit quietly
+ */
+int consumer_checkpoint(void)
+{
+	int len;
+	char *str;
+	char *vol = cprat("", mplrs.Vnum, mplrs.Vden);
+	send_counting_stats(MASTER);
+	MPI_Recv(&len, 1, MPI_INT, MASTER, 1, MPI_COMM_WORLD,
+		 MPI_STATUS_IGNORE);
+	if (len == -1) /* master produces checkpoint file */
+	{
+		MPI_Finalize();
+		return 0;
+	}
+	fprintf(consumer.output, "*Checkpoint file follows this line\n");
+	fprintf(consumer.output, "mplrs3\n%lu %lu %lu %lu %lu\n%s\n", 
+		mplrs.rays, mplrs.vertices, mplrs.bases, mplrs.facets,
+		mplrs.intvertices,vol);
+	free(vol);
+	while (1)
+	{
+		MPI_Recv(&len, 1, MPI_INT, MASTER, 1, MPI_COMM_WORLD,
+			 MPI_STATUS_IGNORE);
+		if (len<0)
+			break;
+		str = (char*)malloc(sizeof(char)*len);
+		MPI_Recv(str, len, MPI_CHAR, MASTER, 1, MPI_COMM_WORLD,
+			 MPI_STATUS_IGNORE);
+		fprintf(consumer.output,"%s\n",str);
+		free(str);
+	}
+	fprintf(consumer.output,"*Checkpoint finished above this line\n");
+	MPI_Finalize();
+	return 0;
+}
+
+/* check whether this outgoing msgbuf has completed.
+ * If the msg is queued (msg->queue == 1),
+ *    then if the first part has completed, send the remaining parts
+ * Don't use with *queued* incoming msgbuf.
+ */
+inline int outgoing_msgbuf_completed(msgbuf *msg)
+{
+	int flag;
+	int count = msg->count;
+	int i;
+	if (msg->queue != 1)
+	{
+		MPI_Testall(count, msg->req, &flag, MPI_STATUSES_IGNORE);
+		return flag;
+	}
+	MPI_Test(msg->req, &flag, MPI_STATUS_IGNORE);
+	if (!flag)
+		return flag;
+	/* first completed, send the rest of the queued send */	
+	mprintf3(("%d: Sending second part of queued send to %d\n",
+		  mplrs.rank, msg->target));
+	for (i=1; i<count; i++)
+	{
+		if (msg->buf[i])
+			MPI_Isend(msg->buf[i], msg->sizes[i], msg->types[i],
+			  	msg->target, msg->tags[i], MPI_COMM_WORLD,
+			  	msg->req+i);
+	}
+	msg->queue = 0;
+	return 0;
+}
+
+inline void free_msgbuf(msgbuf *msg)
+{
+	int i;
+	for (i=0; i<msg->count; i++)
+		free(msg->buf[i]);
+	free(msg->buf);
+	free(msg->req);
+	free(msg->tags);
+	free(msg->sizes);
+	free(msg->types);
+	free(msg);
+	return;
+}
+
+outlist *reverse_list(outlist* head)
+{
+	outlist * last = head, * new_head = NULL;
+	while(last)
+	{
+		outlist * tmp = last;
+		last = last->next;
+		tmp->next = new_head;
+		new_head = tmp;
+	}
+	return new_head;
+}
+
+/* send stats on size of L, etc */
+void send_master_stats(void)
+{
+	unsigned long stats[4] = {master.tot_L, master.num_empty, 0, 0};
+	MPI_Send(stats, 4, MPI_UNSIGNED_LONG, CONSUMER, 1, MPI_COMM_WORLD);
+	return;
+}
+/* get master stats on size of L, etc */
+void recv_master_stats(void)
+{
+	unsigned long stats[4] = {0,0,0,0};
+	MPI_Recv(stats, 4, MPI_UNSIGNED_LONG, MASTER, 1, MPI_COMM_WORLD, 
+		 MPI_STATUS_IGNORE);
+	master.tot_L = stats[0];
+	master.num_empty = stats[1];
+	return;
+}
+
+/* send stats to target for final print */
+void send_counting_stats(int target)
+{
+	char *vol = NULL;
+	if (mplrs.facets>0)
+		vol = cprat("", mplrs.Vnum, mplrs.Vden);
+	else
+	{
+		vol = (char *)malloc(sizeof(char)*2);
+		vol[0] = '0'; vol[1] = '\0';
+	}
+	unsigned long stats[6] = {mplrs.rays, mplrs.vertices, mplrs.bases, 
+			          mplrs.facets, mplrs.intvertices,
+				  strlen(vol)+1};
+	MPI_Send(stats, 6, MPI_UNSIGNED_LONG, target, 1, MPI_COMM_WORLD);
+	MPI_Send(vol, stats[5], MPI_CHAR, target, 1, MPI_COMM_WORLD);
+	free(vol);
+	return;
+}
+
+/* gets counting stats from target */
+void recv_counting_stats(int target)
+{
+	char *vol;
+	unsigned long stats[6];
+	MPI_Recv(stats, 6, MPI_UNSIGNED_LONG, target, 1, MPI_COMM_WORLD,
+		 MPI_STATUS_IGNORE);
+	mplrs.rays+=stats[0];
+	mplrs.vertices+=stats[1];
+	mplrs.bases+=stats[2];
+	mplrs.facets+=stats[3];
+	mplrs.intvertices+=stats[4];
+
+	vol = (char*)malloc(sizeof(char)*stats[5]);
+	MPI_Recv(vol, stats[5], MPI_CHAR, target, 1, MPI_COMM_WORLD,
+		 MPI_STATUS_IGNORE);
+	plrs_readrat(mplrs.Tnum, mplrs.Tden, vol);
+	copy(mplrs.tN, mplrs.Vnum); copy(mplrs.tD, mplrs.Vden);
+	linrat(mplrs.tN, mplrs.tD, 1L, mplrs.Tnum, mplrs.Tden,
+	       1L, mplrs.Vnum, mplrs.Vden);
+	free(vol);
+	return;
+}
+
+/* do the initial print */
+void initial_print(void)
+{
+		fprintf(consumer.output, "*mplrs:%s%s(%s)%d processes\n%s\n",
+			TITLE,VERSION,ARITH,mplrs.size,AUTHOR);
+		fprintf(consumer.output, "*Input taken from %s\n",
+			mplrs.input_filename);
+		fprintf(consumer.output, "*Starting depth of %d maxcobases=%d ",
+			master.initdepth, master.maxcobases);
+		fprintf(consumer.output, "maxdepth=%d lmin=%d lmax=%d scale=%d\n",
+			master.maxdepth, master.lmin,
+			master.lmax, master.scalec);
+		if (mplrs.countonly)
+			fprintf(consumer.output, "*countonly\n");
+		if (consumer.output==stdout)
+			return;
+		printf("*mplrs:%s%s(%s)%d processes\n%s\n",TITLE,VERSION,ARITH,
+		       mplrs.size,AUTHOR);
+		printf("*Input taken from %s\n",mplrs.input_filename);
+		printf("*Output written to: %s\n",consumer.output_filename);
+		printf("*Starting depth of %d maxcobases=%d ", master.initdepth,
+		       master.maxcobases);
+		printf("maxdepth=%d lmin=%d lmax=%d scale=%d\n", master.maxdepth,
+		       master.lmin, master.lmax, master.scalec);
+		if (mplrs.countonly)
+			printf("*countonly\n");
+}
+
+/* do the "*Phase 1 time: " print */
+inline void phase1_print(void)
+{
+	timeval cur;
+	gettimeofday(&cur, NULL);
+	printf("*Phase 1 time: %ld seconds.\n",cur.tv_sec-mplrs.start.tv_sec);
+	return;
+}
+
+/* do the final print */
+void final_print(void)
+{
+	timeval end;
+	char *vol=NULL;
+	gettimeofday(&end, NULL);
+
+	fprintf(consumer.output, "end\n");
+	printf("*Total number of jobs: %lu, L became empty %lu times\n", master.tot_L, master.num_empty);
+	if (mplrs.facets>0)
+	{
+		vol = cprat("*Volume=",mplrs.Vnum,mplrs.Vden);
+		fprintf(consumer.output,"%s\n",vol);
+		fprintf(consumer.output,"*Totals: facets=%lu bases=%lu\n",
+			mplrs.facets, mplrs.bases);
+	}
+	else
+		fprintf(consumer.output, "*Totals: vertices=%lu rays=%lu bases=%lu integer-vertices=%lu\n",
+			mplrs.vertices,mplrs.rays,mplrs.bases,mplrs.intvertices);
+	fprintf(consumer.output, "*Elapsed time: %ld seconds.\n",
+		end.tv_sec - mplrs.start.tv_sec);
+
+	if (consumer.output_filename == NULL)
+		return;
+
+	if (mplrs.facets>0)
+	{
+		printf("%s\n", vol);
+		printf("*Totals: facets=%lu bases=%lu\n", mplrs.facets,
+			mplrs.bases);
+		free(vol);
+	}
+	else
+		printf("*Totals: vertices=%lu rays=%lu bases=%lu integer-vertices=%lu\n",
+		       mplrs.vertices,mplrs.rays,mplrs.bases,mplrs.intvertices);
+	printf("*Elapsed time: %ld seconds.\n", end.tv_sec - mplrs.start.tv_sec);
+}
+
+void post_output(const char *type, const char *data)
+{
+	outlist *out = (outlist *)malloc(sizeof(outlist));
+	out->type = dupstr(type);
+	out->data = dupstr(data);
+	out->next = mplrs.output_list;
+	mplrs.output_list = out;
+	if (mplrs.outnum++ > mplrs.maxbuf) /* buffer <maxbuf output lines */
+	{
+		process_output();	   /* before starting a flush */
+		clean_outgoing_buffers();
+	}
+}
+
+/* strdup */
+inline char *dupstr(const char *str)
+{
+	char *buf = (char *)malloc(sizeof(char)*(strlen(str)+1));
+	strcpy(buf,str);
+	return buf;
+}