/* mplrs.c: initial release of MPI version
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Author: Charles Jordan skip@ist.hokudai.ac.jp
Based on plrs.cpp by Gary Roumanis
Initial lrs Author: David Avis avis@cs.mcgill.ca
*/
/* #include "lrslib.h" */ /* included in mplrs.h */
#include "mplrs.h"
#include <mpi.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>
/* global variables */
mplrsv mplrs; /* state of this process */
masterv master; /* state of the master */
consumerv consumer; /* state of the consumer */
int PLRS_DEBUG = 0;
/******************
* initialization *
******************/
int main(int argc, char **argv)
{
mplrs_init(argc, argv);
mprintf2(("%d: initialized on %s\n",mplrs.rank,mplrs.host));
if (mplrs.size<3)
return mplrs_fallback();
if (mplrs.rank == MASTER)
return mplrs_master();
else if (mplrs.rank == CONSUMER)
return mplrs_consumer();
else
return mplrs_worker();
}
void mplrs_init(int argc, char **argv)
{
int i,j;
int count;
char c;
time_t curt = time(NULL);
char *tim, *tim1;
/* make timestamp for filenames */
tim = ctime(&curt);
tim1 = tim+4;
tim1[3] = tim1[6] = '_';
tim1[15]='\0';
/* start MPI */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &mplrs.rank);
MPI_Comm_size(MPI_COMM_WORLD, &mplrs.size);
MPI_Get_processor_name(mplrs.host, &count);
/* allocate mp for volume calculation, from plrs */
lrs_alloc_mp(mplrs.tN); lrs_alloc_mp(mplrs.tD);
lrs_alloc_mp(mplrs.Vnum); lrs_alloc_mp(mplrs.Vden);
lrs_alloc_mp(mplrs.Tnum); lrs_alloc_mp(mplrs.Tden);
itomp(ZERO, mplrs.Vnum); itomp(ONE, mplrs.Vden);
gettimeofday(&mplrs.start, NULL);
mplrs_initstrucs(); /* initialize default values of globals */
/* process commandline arguments, set input and output files */
mplrs_commandline(argc, argv);
if (mplrs.rank == MASTER || mplrs.rank == CONSUMER)
{
/* open input file for reading on master, open output file for
* writing on consumer, open histogram file for writing on
* master -- if these files are to be used.
*/
mplrs_initfiles();
if (mplrs.rank == MASTER)
master_sendfile();
}
else
{
/* receive input file from master */
MPI_Recv(&count, 1, MPI_INT, 0, 20, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
mplrs.input = (char *)malloc(sizeof(char)*(count+1));
MPI_Recv(mplrs.input, count, MPI_CHAR, 0, 20,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
mplrs.input[count] = '\0';
/* get number of chars needed for worker files */
j = mplrs.size;
for (i=1; j>9; i++)
j = j/10;
i += 6+strlen(tim1); /* mplrs_TIMESTAMP */
i += strlen(mplrs.tfn_prefix) + strlen(mplrs.input_filename) +
i + 6; /* _%d.ine\0 */
mplrs.tfn = (char *)malloc(sizeof(char) * i);
sprintf(mplrs.tfn, "%smplrs_%s%s_%d.ine",
mplrs.tfn_prefix, tim1,
mplrs.input_filename,
mplrs.rank);
/* flatten directory structure in mplrs.input_filename
* for mplrs.tfn, to prevent writing to non-existent
* subdirectories in e.g. /tmp
*/
i = strlen(mplrs.tfn_prefix) + 6 + strlen(tim1)+1;
j = strlen(mplrs.tfn);
for (; i<j; i++)
{
c = mplrs.tfn[i];
if (c == '/' || c == '\\')
mplrs.tfn[i] = '_';
}
}
/* setup signals --
* TERM checkpoint to checkpoint file, or output file if none & exit
* HUP ditto
*/
signal(SIGTERM, mplrs_caughtsig);
signal(SIGHUP, mplrs_caughtsig);
}
/* if we catch a signal, set a flag to exit */
void mplrs_caughtsig(int sig)
{
if (mplrs.caughtsig<2)
mplrs.caughtsig = 1;
signal(sig, mplrs_caughtsig);
return;
}
/* if we've caught a signal, tell the master about it */
void mplrs_handlesigs(void)
{
float junk = 0;
if (mplrs.caughtsig != 1)
return;
/* we want to stop, so no need to hurry and queue the send */
MPI_Send(&junk, 1, MPI_FLOAT, MASTER, 9, MPI_COMM_WORLD);
mplrs.caughtsig = 2; /* handled */
return;
}
/* send the contents of the input file to all workers */
void master_sendfile(void)
{
char *buf;
int count=0;
int c, i;
c = fgetc(master.input);
while (c!=EOF)
{
count++;
c = fgetc(master.input);
}
buf = (char *)malloc(sizeof(char)*(count));
fseek(master.input, 0, SEEK_SET);
for (i=0; i<count; i++)
{
c = fgetc(master.input);
buf[i]=c;
}
for (i=0; i<mplrs.size; i++)
{
if (i==MASTER || i==CONSUMER)
continue;
mprintf2(("M: Sending input file to %d\n", i));
MPI_Send(&count, 1, MPI_INT, i, 20, MPI_COMM_WORLD);
MPI_Send(buf, count, MPI_CHAR, i, 20, MPI_COMM_WORLD);
}
fseek(master.input, 0, SEEK_SET);
free(buf);
}
/* initialize default values of global structures, before commandline */
void mplrs_initstrucs(void)
{
mplrs.cobasis_list = NULL;
mplrs.caughtsig = 0;
mplrs.rays = 0;
mplrs.vertices = 0;
mplrs.bases = 0;
mplrs.facets = 0;
mplrs.intvertices = 0;
mplrs.my_tag = 100;
mplrs.tfn_prefix = DEF_TEMP;
mplrs.tfn = NULL;
mplrs.tfile = NULL;
mplrs.input_filename = DEF_INPUT;
mplrs.input = NULL;
mplrs.output_list = NULL;
mplrs.outnum = 0;
mplrs.maxbuf = DEF_MAXBUF; /* maximum # lines to buffer */
mplrs.countonly = 0;
master.cobasis_list = NULL;
master.size_L = 0;
master.tot_L = 0;
master.num_empty = 0;
master.num_producers = 0;
master.checkpointing = 0;
master.lmin = DEF_LMIN;
master.lmax = DEF_LMAX;
master.scalec = DEF_SCALEC;
master.initdepth = DEF_ID;
master.maxdepth = DEF_MAXD;
master.maxcobases = DEF_MAXC;
master.maxncob = DEF_MAXNCOB;
master.time_limit = 0;
master.hist_filename = DEF_HIST;
master.hist = NULL;
master.doing_histogram = 0;
master.freq_filename = DEF_FREQ;
master.freq = NULL;
master.restart_filename = DEF_RESTART;
master.restart = NULL;
master.checkp_filename = DEF_CHECKP;
master.checkp = NULL;
master.stop_filename = DEF_STOP;
master.stop = NULL;
master.input = NULL;
consumer.prodreq = NULL;
consumer.prodibf = NULL;
consumer.incoming = NULL;
consumer.output_filename = DEF_OUTPUT;
consumer.output = stdout;
consumer.num_producers = 0;
consumer.checkpoint = 0;
consumer.waiting_initial = 1;
}
/* process commandline arguments */
void mplrs_commandline(int argc, char **argv)
{
int i, arg, firstfile=1;
for (i=1; i<argc; i++)
{
if (!strcmp(argv[i], "-lmin"))
{
arg = atoi(argv[i+1]);
i++;
if (arg < 1 )
bad_args();
master.lmin = arg;
if (master.lmin > master.lmax)
master.lmax = arg;
continue;
}
else if (!strcmp(argv[i], "-lmax"))
{
arg = atoi(argv[i+1]);
i++;
if (arg<1 )
bad_args();
master.lmax = arg;
if (master.lmin > master.lmax)
master.lmin = arg;
continue;
}
else if (!strcmp(argv[i], "-scale"))
{
arg = atoi(argv[i+1]);
i++;
if (arg<1)
bad_args();
master.scalec = arg;
continue;
}
else if (!strcmp(argv[i], "-hist"))
{
master.hist_filename = argv[i+1];
i++;
continue;
}
else if (!strcmp(argv[i], "-freq"))
{
master.freq_filename = argv[i+1];
i++;
continue;
}
else if (!strcmp(argv[i], "-stopafter"))
{
arg = atoi(argv[i+1]);
i++;
if (arg<1)
bad_args();
master.maxncob = arg;
continue;
}
else if (!strcmp(argv[i], "-countonly"))
{
mplrs.countonly = 1;
continue;
}
else if (!strcmp(argv[i], "-maxbuf"))
{
arg = atoi(argv[i+1]);
i++;
if (arg<1)
bad_args();
mplrs.maxbuf = arg;
continue;
}
else if (!strcmp(argv[i], "-id"))
{
arg = atoi(argv[i+1]);
i++;
if (arg<0)
bad_args();
master.initdepth = arg;
continue;
}
else if (!strcmp(argv[i], "-maxd"))
{
arg = atoi(argv[i+1]);
i++;
if (arg<1)
bad_args();
master.maxdepth = arg;
continue;
}
else if (!strcmp(argv[i], "-maxc"))
{
arg = atoi(argv[i+1]);
i++;
if (arg<0)
bad_args();
master.maxcobases = arg;
continue;
}
else if (!strcmp(argv[i], "-checkp"))
{
master.checkp_filename = argv[i+1];
i++;
continue;
}
else if (!strcmp(argv[i], "-stop"))
{
master.stop_filename = argv[i+1];
i++;
continue;
}
else if (!strcmp(argv[i], "-time"))
{
arg = atoi(argv[i+1]);
i++;
if (arg<1)
bad_args();
master.time_limit = arg;
continue;
}
else if (!strcmp(argv[i], "-restart"))
{
master.restart_filename = argv[i+1];
i++;
continue;
}
else if (!strcmp(argv[i], "-temp"))
{
mplrs.tfn_prefix = argv[i+1];
i++;
continue;
}
else if (firstfile == 1)
{
mplrs.input_filename = argv[i];
firstfile = 2;
}
else if (firstfile == 2)
{
consumer.output_filename = argv[i];
firstfile = 3;
}
else
bad_args();
}
if (master.lmax == -1)
master.lmax = (master.lmin>0? master.lmin: 0);
if (mplrs.input_filename==NULL) /* need an input file */
bad_args();
if ((master.stop_filename!=NULL || master.time_limit!=0) &&
master.checkp_filename==NULL)
bad_args(); /* need checkpoint file if stop condition given */
}
/* open input file on master, histogram (if exists) on master,
* output (if exists) on consumer
*/
void mplrs_initfiles(void)
{
if (mplrs.rank == MASTER)
{
master.input = fopen(mplrs.input_filename, "r");
if (master.input == NULL)
{
printf("Unable to open %s for reading [%s].\n",
mplrs.input_filename, mplrs.host);
/* MPI_Finalize(); */
exit(0);
}
if (master.hist_filename != NULL)
{
master.hist = fopen(master.hist_filename, "w");
if (master.hist == NULL)
{
printf("Unable to open %s for writing [%s].\n",
master.hist_filename, mplrs.host);
/* MPI_Finalize(); */
exit(0);
}
master.doing_histogram = 1;
mprintf2(("M: Prepared histogram (%s)\n", master.hist_filename));
}
if (master.freq_filename != NULL)
{
master.freq = fopen(master.freq_filename, "w");
if (master.freq == NULL)
{
printf("Unable to open %s for writing [%s].\n",
master.freq_filename, mplrs.host);
exit(0);
}
mprintf2(("M: Prepared frequency file (%s)\n",
master.freq_filename));
}
if (master.checkp_filename !=NULL)
{
master.checkp = fopen(master.checkp_filename, "w");
if (master.checkp == NULL)
{
printf("Unable to open %s for writing [%s].\n",
master.checkp_filename, mplrs.host);
/* MPI_Finalize(); */
exit(0);
}
mprintf2(("M: Prepared checkpoint file (%s)\n",
master.checkp_filename));
}
if (master.restart_filename != NULL)
{
master.restart = fopen(master.restart_filename, "r");
if (master.restart == NULL)
{
printf("Unable to open %s for reading [%s].\n",
master.restart_filename, mplrs.host);
/* MPI_Finalize(); */
exit(0);
}
mprintf2(("M: Opened restart file (%s)\n",
master.restart_filename));
}
}
if (mplrs.rank == CONSUMER)
{
if (consumer.output_filename == NULL)
return;
consumer.output = fopen(consumer.output_filename, "w");
if (consumer.output == NULL)
{
printf("Unable to open %s for writing [%s].\n",
consumer.output_filename, mplrs.host);
/* MPI_Finalize(); */
exit(0);
}
}
}
/* Bad commandline arguments. Complain and die. */
void bad_args(void)
{
if (mplrs.rank == CONSUMER)
printf("Invalid arguments.\n%s\n", USAGE);
MPI_Finalize();
exit(0);
}
/* fallback -- meaningless if <3 processes
* better would be to fallback to normal lrs if <4 processes
*/
int mplrs_fallback(void)
{
if (mplrs.rank==0)
printf("mplrs requires at least 3 processes.\n");
MPI_Finalize();
exit(0);
}
/**********
* master *
**********/
int mplrs_master(void)
{
int i;
int loopiter = 0; /* do some things only sometimes */
MPI_Request ign = MPI_REQUEST_NULL; /* a request we'll ignore */
timeval cur, last; /* for histograms */
int flag = 0;;
int phase = 1; /* In phase1? */
int done = -1; /* need a negative int to send to finished workers */
float junk=0; /* need a buffer for incoming signal reports */
int ncob=0; /* for printing sizes of sub-problems and for -stopafter*/
unsigned long tot_ncob = 0;
int want_stop = 0;
gettimeofday(&last, NULL);
master.num_producers = 0; /* nobody working right now */
master.act_producers = (unsigned int *)malloc(sizeof(unsigned int)*mplrs.size);
master.live_workers = mplrs.size - 2;
master.workin = (int *)malloc(sizeof(int)*mplrs.size);
master.mworkers = (MPI_Request *)malloc(sizeof(MPI_Request)*mplrs.size);
master.incoming = NULL;
master.sigcheck = (MPI_Request *)malloc(sizeof(MPI_Request)*mplrs.size);
if (master.restart!=NULL)
{
master_restart();
if (master.size_L>0)
phase = 0;
}
for (i=0; i<mplrs.size; i++)
{
master.act_producers[i] = 0;
if (i==MASTER)
continue;
MPI_Irecv(&junk, 1, MPI_FLOAT, i, 9, MPI_COMM_WORLD,
master.sigcheck+i);
if (i==CONSUMER)
continue;
MPI_Irecv(master.workin+i, 1, MPI_UNSIGNED, i, 6, MPI_COMM_WORLD,
master.mworkers+i);
}
while ((master.cobasis_list!=NULL && !master.checkpointing && !want_stop) || master.num_producers>0 || master.live_workers>0)
{
loopiter++;
/* sometimes check if we should update histogram etc */
if (!(loopiter&0x1ff))
{
if (master.maxncob>0 && master.maxncob<=tot_ncob)
want_stop = 1;
if (master.doing_histogram)
print_histogram(&cur, &last);
}
/* sometimes check if we should checkpoint */
if (!(loopiter&0x7ff) && !master.checkpointing)
{
if (master.stop_filename!=NULL || master.time_limit!=0)
check_stop();
master_checksigs();
}
recv_producer_lists();
/* check if anyone wants work */
for (i=0; i<mplrs.size; i++)
{ /* consumer and master don't want work */
if (i==CONSUMER || i==MASTER)
continue;
/* workers that have exited can't work */
if (master.mworkers[i]==MPI_REQUEST_NULL)
continue;
if (master.num_producers>0 && master.cobasis_list==NULL)
{
break; /* no work to give now, but some may
* appear later
*/
}
if (phase==1 && i!=INITIAL)
continue; /* INITIAL gets the first bit */
MPI_Test(master.mworkers+i, &flag, MPI_STATUS_IGNORE);
if (!flag)
continue; /* i is not ready for more work */
ncob = master.workin[i];
tot_ncob+=ncob;
mprintf2(("M: %d looking for work\n", i));
if ((master.cobasis_list!=NULL || phase==1) &&
!master.checkpointing && !want_stop)
{ /* and not checkpointing! */
send_work(i,phase);
MPI_Irecv(master.workin+i, 1, MPI_UNSIGNED,i, 6,
MPI_COMM_WORLD, master.mworkers+i);
phase=0;
if (master.freq!=NULL && ncob>0)
fprintf(master.freq, "%d\n", ncob);
continue;
}
/* else tell worker we've finished */
mprintf(("M: Saying goodbye to %d, %d left\n", i,
master.live_workers-1));
MPI_Isend(&done, 1, MPI_INT, i, 8, MPI_COMM_WORLD,&ign);
MPI_Request_free(&ign);
master.live_workers--;
if (master.freq!=NULL && ncob>0)
fprintf(master.freq, "%d\n", ncob);
}
clean_outgoing_buffers();
}
if (master.checkpointing)
master_checkpoint();
send_master_stats();
MPI_Finalize();
free(master.workin);
free(master.mworkers);
free(master.act_producers);
free(master.sigcheck);
return 0;
}
/* prepare to receive remaining cobases from target.
* Since we don't yet know the size of buffers needed, we
* only Irecv the header and will Irecv the actual cobases later
*/
void master_add_incoming(int target)
{
msgbuf *msg = (msgbuf *)malloc(sizeof(msgbuf));
msg->req = (MPI_Request *)malloc(sizeof(MPI_Request)*3);
msg->buf = (void **)malloc(sizeof(void *)*3);
msg->buf[0] = (int *)malloc(sizeof(int) * 3); /* (strlen,lengths,tag) */
msg->buf[1] = NULL; /* sizes not known yet */
msg->buf[2] = NULL;
msg->count = 3;
msg->target = target;
msg->queue = 1;
msg->tags = NULL;
msg->sizes = NULL;
msg->types = NULL;
msg->next = master.incoming;
master.incoming = msg;
MPI_Irecv(msg->buf[0], 3, MPI_INT, target, 10, MPI_COMM_WORLD,msg->req); return;
}
/* check our list of incoming messages from producers about cobases
* to add to L. Add any from messages that have completed.
* If the header has completed (msg->queue==1 and header completed),
* add the remaining MPI_Irecv's.
* Update num_producers to keep track of how many messages the master
* is owed (workers are not allowed to exit until num_producers==0 and
* L is empty).
* Also update size_L
*/
void recv_producer_lists(void)
{
msgbuf *msg, *prev=NULL, *next;
int *header;
int flag;
for (msg = master.incoming; msg; msg=next)
{
next = msg->next;
MPI_Test(msg->req, &flag, MPI_STATUS_IGNORE);
if (!flag) /* header has not completed yet */
{
prev = msg;
continue;
}
header = (int *)msg->buf[0];
if (msg->queue) /* header completed, and need to Irecv now */
{
if (header[0]==-1) /* producer returns NOTHING */
{
master.num_producers--;
master.act_producers[msg->target]--;
free_msgbuf(msg);
if (prev)
prev->next = next;
else
master.incoming = next;
continue;
}
msg->buf[1]= (char*)malloc(sizeof(char)*header[1]);
msg->buf[2]= (int *)malloc(sizeof(int)*header[0]);
MPI_Irecv(msg->buf[1], header[1], MPI_CHAR, msg->target,
header[2], MPI_COMM_WORLD, msg->req+1);
MPI_Irecv(msg->buf[2], header[0], MPI_INT, msg->target,
header[2], MPI_COMM_WORLD, msg->req+2);
msg->queue=0;
prev = msg;
continue;
}
/* header completed, did the rest? */
MPI_Testall(2, msg->req+1, &flag, MPI_STATUSES_IGNORE);
if (!flag) /* not yet */
{
prev = msg;
continue;
}
mprintf2(("M: %d returned non-empty producer list (%d, %d)\n",
msg->target, header[0], header[1]));
process_returned_cobases(msg);
mprintf2(("M: Now have size_L=%lu\n",master.size_L));
if (prev)
prev->next = next;
else
master.incoming = next;
master.num_producers--;
master.act_producers[msg->target]--;
free_msgbuf(msg);
}
return;
}
/* msg is a completed, non-empty buffer containing cobases to add to
* L. Process it, add them to L, and update size_L
* Basically the inverse of return_unfinished_cobases()
*/
void process_returned_cobases(msgbuf *msg)
{
int *header = (int *)msg->buf[0];
char *str = (char *)msg->buf[1];
int *lengths = (int *)msg->buf[2];
int i;
char *cob;
for (i=0; i<header[0]; i++)
{
cob = (char *)malloc(sizeof(char)*(lengths[i]+1));
strncpy(cob, str, lengths[i]);
cob[lengths[i]] = '\0';
str+=lengths[i];
mprintf2(("M: Adding to L: %s\n",cob));
master.cobasis_list = addlist(master.cobasis_list, cob);
}
master.size_L += header[0];
master.tot_L += header[0];
return;
}
/* send one unit from L to target. if phase!=0, this is the first
* unit we're sending (i.e. phase 1). usually, phase==0.
* Parameters are scaled and sent in the header.
*/
void send_work(int target, int phase)
{
slist *cob;
msgbuf *msg = (msgbuf *)malloc(sizeof(msgbuf));
int *header;
msg->req = (MPI_Request *)malloc(sizeof(MPI_Request)*2);
msg->buf = (void **)malloc(sizeof(void *)*2);
/*{length of work string, int maxdepth, int maxcobases, 5xfuture use} */
msg->buf[0] = (int *)malloc(sizeof(int) * 8);
header = (int *)msg->buf[0];
if (phase==0) /* normal */
{
cob = master.cobasis_list;
master.cobasis_list = cob->next;
header[0] = strlen((char *)cob->data);
msg->buf[1] = cob->data;
setparams(header); /* scale if needed */
master.size_L--;
if (master.size_L == 0)
master.num_empty++;
mprintf(("M: Sending work to %d (%d,%d,%d) %s\n",
target, header[0], header[1], header[2],
(char*)msg->buf[1]));
msg->count = 2;
free(cob);
}
else /* phase 1 */
{
header[0] = 0; /* header[0]==0 means initial phase 1 */
header[1] = master.initdepth;
header[2] = master.maxcobases;
mprintf(("M: Sending phase 1 to %d (%d,%d)\n", target,
header[1], header[2]));
msg->buf[1] = NULL;
msg->count = 1;
}
msg->target = target;
msg->queue = 0;
msg->tags = NULL;
msg->sizes = NULL;
msg->types = NULL;
/* ready to send */
MPI_Isend(header, 8, MPI_INT, target, 1, MPI_COMM_WORLD, msg->req);
if (phase==0)
MPI_Isend(msg->buf[1], header[0], MPI_CHAR, target, 1,
MPI_COMM_WORLD, msg->req+1);
master_add_incoming(target); /* prepare to receive remaining cobases */
msg->next = mplrs.outgoing;
mplrs.outgoing = msg;
master.act_producers[target]++;
master.num_producers++;
return;
}
/* header is a work header (length, maxd, maxc) not yet set.
* Set the parameters (maxd, maxc) as desired.
*/
void setparams(int *header)
{
/* if L is too small, use maxdepth */
if (master.lmin>0 && (master.size_L < mplrs.size*master.lmin))
header[1] = master.maxdepth;
else /* don't have too small L, so no maxdepth */
header[1] = 0;
header[2] = master.maxcobases;
if (master.lmax>0 && (master.size_L > mplrs.size * master.lmax))
header[2] = header[2] * master.scalec;
}
/* check if we want to stop now.
* if master.stop_filename exists or time limit exceeded,
* set master.checkpointing = 1 and inform consumer.
* this is not an immediate stop -- we wait for current workers to
* complete the tasks they've been assigned, stopping after that.
*/
void check_stop(void)
{
int check[3] = {CHECKFLAG,0,0};
MPI_Request ign;
struct timeval cur;
int flag = 0;
if (master.stop_filename)
{
mprintf2(("M: checking stop file %s\n", master.stop_filename));
master.stop = fopen(master.stop_filename, "r");
if (master.stop!=NULL)
{
flag=1;
fclose(master.stop);
}
}
if (master.time_limit!=0)
{
mprintf2(("M: checking if time exceeded\n"));
gettimeofday(&cur, NULL);
if (cur.tv_sec - mplrs.start.tv_sec > master.time_limit)
flag=1;
}
if (flag!=0)
{
mprintf(("M: Stop condition detected, checkpointing!\n"));
master.checkpointing = 1;
MPI_Isend(check, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD, &ign);
MPI_Request_free(&ign);
}
}
/* check if we've caught a signal, or we've received a message from someone
* that has. If so, we want to checkpoint like above
*/
void master_checksigs(void)
{
int i, flag, size=mplrs.size;
int check[3] = {CHECKFLAG,0,0};
MPI_Request ign;
if (mplrs.caughtsig == 1)
{
mprintf(("M: I caught signal, checkpointing!\n"));
MPI_Isend(check, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD, &ign);
master.checkpointing = 1;
return;
}
for (i=1; i<size; i++)
{
MPI_Test(master.sigcheck+i, &flag, MPI_STATUS_IGNORE);
if (flag)
{
mprintf(("M: %d caught signal, checkpointing!\n",i));
MPI_Isend(check, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD, &ign);
master.checkpointing = 1;
return;
}
}
}
/* we're checkpointing, receive stats from consumer and output checkpoint file
*/
/* two cases: we have a file to checkpoint to (normal)
* or, we caught a signal and want to checkpoint but have no checkpoint file
* in this case we append to the output file via the consumer, with
* comments on where to cut
*/
/* mplrs1 format: counts are 'unsigned int' (%d unfortunately)
* mplrs2 format: counts are 'unsigned long' (%lu)
* so mplrs1 files can be read by mplrs2 readers not necc. reverse
*/
void master_checkpoint(void)
{
int fin = -1;
mprintf(("M: making checkpoint file\n"));
recv_counting_stats(CONSUMER);
if (master.checkp_filename != NULL)
{
MPI_Send(&fin, 1, MPI_INT, CONSUMER, 1, MPI_COMM_WORLD);
master_checkpointfile();
return;
}
else
{
fin = 1;
MPI_Send(&fin, 1, MPI_INT, CONSUMER, 1, MPI_COMM_WORLD);
master_checkpointconsumer();
return;
}
}
/* note: master_checkpointconsumer and master_checkpointfile should
* produce the *same* format. First two lines of checkpointconsumer
* done by the consumer directly for now.
*/
void master_checkpointconsumer(void)
{
int len;
char *str;
slist *list, *next;
for (list=master.cobasis_list; list; list=next)
{
next = list->next;
str = (char*)list->data;
len = strlen(str)+1; /* include \0 */
MPI_Send(&len, 1, MPI_INT, CONSUMER, 1, MPI_COMM_WORLD);
MPI_Send(str, len, MPI_CHAR, CONSUMER, 1, MPI_COMM_WORLD);
free(str);
free(list);
}
len = -1;
MPI_Send(&len, 1, MPI_INT, CONSUMER, 1, MPI_COMM_WORLD);
}
void master_checkpointfile(void)
{
slist *list, *next;
char *vol = cprat("", mplrs.Vnum, mplrs.Vden);
fprintf(master.checkp, "mplrs3\n%lu %lu %lu %lu %lu\n%s\n",
mplrs.rays, mplrs.vertices, mplrs.bases, mplrs.facets,
mplrs.intvertices,vol);
free(vol);
for (list=master.cobasis_list; list; list=next)
{
next = list->next;
fprintf(master.checkp, "%s\n", (char *)list->data);
free(list->data);
free(list);
master.size_L--;
}
fclose(master.checkp);
return;
}
/* we want to restart. load L and counting stats from restart file,
* send counting stats to consumer (after notifying consumer of restart)
*/
void master_restart(void)
{
char *line=NULL;
char *vol=NULL;
size_t size=0, vsize=0;
ssize_t len=0;
int restart[3] = {RESTARTFLAG,0,0};
int ver;
/* check 'mplrs1' header */
len = getline(&line, &size, master.restart);
if (len!=7 || (strcmp("mplrs1\n",line) && strcmp("mplrs2\n",line) &&
strcmp("mplrs3\n",line)))
{
printf("Unknown checkpoint format\n");
/* MPI_Finalize(); */
exit(0);
}
mprintf2(("M: found checkpoint header\n"));
sscanf(line,"mplrs%d\n",&ver);
/* get counting stats */
fscanf(master.restart, "%lu %lu %lu %lu %lu\n",
&mplrs.rays, &mplrs.vertices, &mplrs.bases, &mplrs.facets,
&mplrs.intvertices);
if (ver<3) /* volume added in mplrs3 */
printf("*Old checkpoint file, volume may be incorrect\n");
else /* get volume */
{
len = getline(&vol, &vsize, master.restart);
if (len<=1)
{
printf("Broken checkpoint file\n");
exit(0);
}
vol[len-1] = '\0'; /* remove '\n' */
plrs_readrat(mplrs.Tnum, mplrs.Tden, vol);
copy(mplrs.tN, mplrs.Vnum); copy(mplrs.tD, mplrs.Vden);
linrat(mplrs.tN, mplrs.tD, 1L, mplrs.Tnum, mplrs.Tden,
1L, mplrs.Vnum, mplrs.Vden);
free(vol);
}
/* get L */
while((len = getline(&line, &size, master.restart))!= -1)
{
if (line[0]=='\n') /* ignore blank lines */
{
free(line);
line = NULL;
size=0;
continue;
}
line[strlen(line)-1]='\0'; /* replace \n by \0 */
master.cobasis_list = addlist(master.cobasis_list, line);
master.size_L++;
line = NULL;
size = 0;
}
master.tot_L = master.size_L; /* maybe should save and retrieve */
mprintf(("M: Restarted with |L|=%lu\n",master.size_L));
fclose(master.restart);
MPI_Send(restart, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD);
send_counting_stats(CONSUMER);
mplrs.rays = 0;
mplrs.vertices = 0;
mplrs.bases = 0;
mplrs.facets = 0;
mplrs.intvertices = 0;
return;
}
/* check if we should update the histogram and then do it */
void print_histogram(timeval *cur, timeval *last)
{
float sec;
int i;
int act;
gettimeofday(cur,NULL);
if (cur->tv_sec > last->tv_sec)
{
sec = (float)(cur->tv_sec - mplrs.start.tv_sec) + ((float)(cur->tv_usec - mplrs.start.tv_usec))/1000000;
act=0;
for (i=0; i<mplrs.size; i++)
if (master.act_producers[i])
act++;
/* time (seconds), number of active producers, number of
* producers owing us a message about remaining cobases,
* 0, 0 (existed in mplrs.cpp but no longer)
*/
fprintf(master.hist, "%f %d %lu %d %d %d %lu\n",
sec, act, master.size_L, master.num_producers, 0, 0, master.tot_L);
fflush(master.hist);
last->tv_sec = cur->tv_sec;
last->tv_usec = cur->tv_usec;
}
}
/**********
* worker *
**********/
int mplrs_worker(void)
{
char *starting_cobasis;
/* header for incoming work:
* {length of work string, int maxdepth, int maxcobases, 5xfuture use}
*/
int header[8]={0,0,0,0,0,0,0,0};
MPI_Request req = MPI_REQUEST_NULL;
unsigned int ncob=0; /* used for # cobases in prev. job */
unsigned int tot_ncob=0;
int len;
int flag;
while (1)
{
ncob = mplrs.bases - tot_ncob; /* #cobases in last job */
tot_ncob = mplrs.bases; /* #cobases done so far */
/* check signals */
mplrs_handlesigs();
/* ask for work */
mprintf2(("%d: Asking master for work\n",mplrs.rank));
MPI_Isend(&ncob, 1, MPI_UNSIGNED, MASTER, 6, MPI_COMM_WORLD,
&req);
flag = 0;
while (1) /* was MPI_Wait(&req, MPI_STATUS_IGNORE); */
{
MPI_Test(&req, &flag, MPI_STATUS_IGNORE);
if (flag)
break;
clean_outgoing_buffers();
}
starting_cobasis = NULL;
/* get response */
MPI_Recv(header, 8, MPI_INT, MASTER, MPI_ANY_TAG,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
mprintf2(("%d: Message received from master\n",mplrs.rank));
len = header[0];
if (len==-1) /* no more work to do */
return mplrs_worker_finished();
if (len>0)
{
starting_cobasis = (char*)malloc(sizeof(char)*(len+1));
MPI_Recv(starting_cobasis, len, MPI_CHAR, MASTER,
MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
starting_cobasis[len] = '\0';
}
/* do work */
do_work(header, starting_cobasis);
free(starting_cobasis);
/* send output and unfinished cobases */
process_output();
return_unfinished_cobases();
clean_outgoing_buffers(); /* check buffered sends,
* free if finished
*/
}
return 0; /* unreachable */
}
/* This worker has finished. Tell the consumer, send counting stats
* and exit.
*/
int mplrs_worker_finished(void)
{
int done[3] = {-1,-1,-1};
mprintf((" %d: All finished! Informing consumer.\n",mplrs.rank));
while (mplrs.outgoing) /* needed? negligible in any case */
{
clean_outgoing_buffers();
}
MPI_Send(&done, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD);
send_counting_stats(CONSUMER);
MPI_Finalize();
return 0;
}
/* Go through our outgoing MPI_Isends, free anything that has completed.
* Also, if any of these are queued and the header has completed, then
* send the remaining data.
* Don't use with incoming buffers.
*/
void clean_outgoing_buffers(void)
{
msgbuf *msg, *next, *prev=NULL;
for (msg = mplrs.outgoing; msg; msg=next)
{
next = msg->next;
if (!outgoing_msgbuf_completed(msg))
{
prev = msg;
continue;
}
if (prev)
prev->next = next;
else
mplrs.outgoing = next;
free_msgbuf(msg);
}
}
/* header[1] gives maxdepth, header[2] gives maxcobases,
* if header[0] > 0,
* starting_cobasis gives the starting cobasis.
* if header[0] == 0, starting at initial input (phase 1)
*/
void do_work(const int *header, const char *starting_cobasis)
{
char *argv[] = {mplrs.tfn};
/* prepare input file */
mplrs.initializing = 0;
mplrs.tfile = fopen(mplrs.tfn, "w");
fprintf(mplrs.tfile, "%s", mplrs.input);
mprintf3(("%d: Received work (%d,%d,%d)\n",mplrs.rank,header[0],
header[1],header[2]));
if (header[0]>0)
fprintf(mplrs.tfile, "\nmindepth 0\nrestart %s\n",
starting_cobasis);
else
mplrs.initializing = 1; /* phase 1 output handled different */
if (header[1]>0)
fprintf(mplrs.tfile, "\nmaxdepth %d\n", header[1]);
if (header[2]>0)
fprintf(mplrs.tfile, "\nmaxcobases %d\n", header[2]);
if (mplrs.countonly == 1)
fprintf(mplrs.tfile, "countonly\n");
fclose(mplrs.tfile);
mprintf2(("%d: Calling lrs_main\n",mplrs.rank));
lrs_main(1, argv);
mprintf2(("%d: lrs_main returned\n",mplrs.rank));
if (remove(mplrs.tfn) != 0) /* UNsynchronized printf -- should fix */
printf("Error deleting thread file!\n");
}
/* The worker has finished its work. Process the output, preparing
* and sending the output to the consumer, and preparing the unfinished
* cobases for return_unfinished_cobases().
*/
void process_output(void)
{
outlist *out = mplrs.output_list, *next;
char *out_string=NULL; /* for output file if exists */
const char *type; /* because plrs_output is C++ at the moment */
const char *data; /* because plrs_output is C++ at the moment */
int len = 1024;
mplrs.outnum = 0; /* clearing buffer */
mplrs.output_list = NULL;
out_string = (char *)malloc(sizeof(char)*len);
out_string[0]='\0';
/* reverse when initializing to get correct order */
if (mplrs.initializing)
out = reverse_list(out);
while (out)
{
type = out->type;
data = out->data;
if (!strcmp(type, "vertex"))
out_string = append_out(out_string, &len, data);
else if (!strcmp(type, "ray"))
out_string = append_out(out_string, &len, data);
else if (!strcmp(type, "cobasis"))
process_cobasis(data);
else if (!strcmp(type, "V cobasis"))
out_string = append_out(out_string, &len, data);
else if (!strcmp(type, "facet count"))
mplrs.facets += atoi(data);
else if (!strcmp(type, "ray count"))
mplrs.rays += atoi(data);
else if (!strcmp(type, "basis count"))
mplrs.bases += atoi(data);
else if (!strcmp(type, "vertex count"))
mplrs.vertices += atoi(data);
else if (!strcmp(type, "integer vertex count"))
mplrs.intvertices += atoi(data);
else if (!strcmp(type, "volume"))
{
plrs_readrat(mplrs.Tnum, mplrs.Tden, data);
copy(mplrs.tN, mplrs.Vnum); copy(mplrs.tD, mplrs.Vden);
linrat(mplrs.tN, mplrs.tD, 1L, mplrs.Tnum, mplrs.Tden,
1L, mplrs.Vnum, mplrs.Vden);
}
else if (!strcmp(type, "options warning"))
{
/* only do warnings once, otherwise repeated */
if (mplrs.initializing)
out_string = append_out(out_string, &len, data);
}
else if (!strcmp(type, "header"))
{
/*only do header if initializing, otherwise repeated*/
if (mplrs.initializing)
out_string = append_out(out_string, &len, data);
}
else if (!strcmp(type, "debug"))
{
out_string = append_out(out_string, &len, data);
}
next = out->next;
free(out->type);
free(out->data);
free(out);
out = next;
}
if (strlen(out_string)>0 && strcmp(out_string, "\n"))
send_output(1, out_string);
else
free(out_string);
}
/* send this string to the consumer to output.
* If dest!=0, then it goes to the output file (stdout if no output file).
* If dest==0, then it goes to stdout.
*
* The pointer str is surrendered to send_output and should not be changed
* It will be freed once the send is complete.
*/
/* str should not be NULL */
void send_output(int dest, char *str)
{
msgbuf *msg = (msgbuf *)malloc(sizeof(msgbuf));
int *header = (int *)malloc(sizeof(int)*3);
header[0] = dest;
header[1] = strlen(str);
header[2] = mplrs.my_tag; /* to ensure the dest/str pair
* remains intact even if another
* send happens in between
*/
msg->req = (MPI_Request *)malloc(sizeof(MPI_Request)*2);
msg->buf = (void **)malloc(sizeof(void *)*2);
msg->buf[0] = header;
msg->buf[1] = str;
msg->count = 2;
msg->target = CONSUMER;
msg->queue = 1;
msg->tags = (int *)malloc(sizeof(int)*2);
msg->sizes = (int *)malloc(sizeof(int)*2);
msg->types = (MPI_Datatype *)malloc(sizeof(MPI_Datatype)*2);
msg->types[1] = MPI_CHAR;
msg->sizes[1] = header[1]+1;
msg->tags[1] = mplrs.my_tag;
mplrs.my_tag++;
msg->next = mplrs.outgoing;
mplrs.outgoing = msg;
MPI_Isend(header, 3, MPI_INT, CONSUMER, 7, MPI_COMM_WORLD,
msg->req);
}
/* called from process_output to handle a 'cobasis',
* add to queue to return to master
*/
/* awful string-hacking, basically copied from plrs processCobasis() */
/* First, if first characters are 'F#', it's a hull. otherwise it's not.
* Then, remove ignore_chars.
* Then, remove everything starting from the first 'I'
* Then, copy everything verbatim, except:
* if it's a hull, replace everything between second and third spaces
* by '0'
* if it's not a hull, replace everything between third and fourth
* spaces by '0'
* (this is resetting the depth to be 0 for a restart)
*/
void process_cobasis(const char *newcob)
{
int nlen = strlen(newcob);
char *buf = (char *)malloc(sizeof(char) * (nlen+1));
int i,j,k;
int num_spaces=0; /* we count the number of spaces */
char ignore_chars[] = "#VRBh=facetsFvertices/rays";
char c;
int num_ignore = strlen(ignore_chars);
int hull = 0;
int replace = 0;
mprintf3(("%d: process_cobasis( %s )", mplrs.rank, newcob));
if (nlen>1 && newcob[0]=='F' && newcob[1]=='#')
hull = 1;
for (i=0,j=0; i<=nlen; i++)
{
c = newcob[i];
/* ignore ignore_chars */
for (k=0; k<=num_ignore; k++)
if (c == ignore_chars[k])
break;
if (k<=num_ignore)
continue;
if (c=='I')
break;
if (c==' ') /* count spaces to set depth to 0 for restart */
{
num_spaces++;
if ( (num_spaces==2 && hull==1) ||
(num_spaces==3 && hull==0) )
replace = 1;
else if ( (num_spaces==3 && hull==1) ||
(num_spaces==4 && hull==0) )
replace = 0;
buf[j++] = c; /* copy spaces */
continue;
}
if (replace == 1)
buf[j++]='0';
else
buf[j++]=c;
}
buf[j]='\0';
mprintf3((" produced %s\n",buf));
mplrs.cobasis_list = addlist(mplrs.cobasis_list, buf);
}
inline slist *addlist(slist *list, void *buf)
{
slist *n = (slist *)malloc(sizeof(struct slist));
n->data = buf;
n->next = list;
return n;
}
/* mplrs.cobasis_list may have things to send to the master.
* Send the header, and then the cobases to add to L.
*/
void return_unfinished_cobases(void)
{
int listsize;
slist *list, *next;
int *lengths=NULL;
char *cobases=NULL;
int size = 0;
int i;
int start;
/* header is (strlen(cobases), length of lengths, mplrs.my_tag) */
int *header = (int *)malloc(sizeof(int)*3);
msgbuf *msg = (msgbuf *)malloc(sizeof(msgbuf));
msg->target = MASTER;
for (listsize=0, list=mplrs.cobasis_list; list; list=list->next)
{
listsize++;
size += strlen((char *)list->data);
}
if (listsize == 0)
{
header[0] = -1;
header[1] = -1;
header[2] = -1;
msg->buf = (void **)malloc(sizeof(void *));
msg->buf[0] = header;
msg->count = 1;
msg->req = (MPI_Request *)malloc(sizeof(MPI_Request));
msg->queue = 0;
msg->tags = NULL;
msg->sizes = NULL;
msg->types = NULL;
MPI_Isend(header, 3, MPI_INT, MASTER, 10, MPI_COMM_WORLD,
msg->req);
msg->next = mplrs.outgoing;
mplrs.outgoing = msg;
return;
}
lengths = (int *)malloc(sizeof(int)*listsize); /*allows unconcatenate*/
cobases = (char *)malloc(sizeof(char)*(size+1));/*concatenated + 1 \0*/
for (start=0, i=0, list=mplrs.cobasis_list; list; list=next, i++)
{
next = list->next;
strcpy(cobases+start, (char *)list->data);
lengths[i] = strlen((char *)list->data);
start+=lengths[i];
free(list->data);
free(list);
}
/* final \0 is there */
header[0] = listsize;
header[1] = size+1;
header[2] = mplrs.my_tag;
msg->req = (MPI_Request *)malloc(sizeof(MPI_Request) * 3);
msg->buf = (void **)malloc(sizeof(void *) * 3);
msg->buf[0] = header;
msg->buf[1] = cobases;
msg->buf[2] = lengths;
msg->count = 3;
msg->queue = 0;
msg->tags = NULL;
msg->sizes = NULL;
msg->types = NULL;
mprintf2(("%d: Queued send of %d cobases for L\n",mplrs.rank,listsize));
MPI_Isend(header, 3, MPI_INT, MASTER, 10, MPI_COMM_WORLD, msg->req);
MPI_Isend(cobases, header[1], MPI_CHAR, MASTER, mplrs.my_tag,
MPI_COMM_WORLD, msg->req+1);
MPI_Isend(lengths, listsize, MPI_INT, MASTER, mplrs.my_tag,
MPI_COMM_WORLD, msg->req+2);
mplrs.my_tag++;
msg->next = mplrs.outgoing;
mplrs.outgoing = msg;
mplrs.cobasis_list = NULL;
}
/* dest is a string in a buffer with size *size.
* Append src and a newline to the string, realloc()ing as necessary,
* returning the new pointer and updating size.
*/
char *append_out(char *dest, int *size, const char *src)
{
int len1 = strlen(dest);
int len2 = strlen(src);
int newsize = *size;
char *newp = dest;
if (len1 + len2 + 2 > *size)
{
newsize = newsize<<1;
while ((newsize < len1+len2+2) && newsize)
newsize = newsize<<1;
if (!newsize)
newsize = len1+len2+2;
newp = (char *)realloc(dest, sizeof(char) * newsize);
if (!newp)
{
newsize = len1+len2+2;
newp = (char *)realloc(dest, sizeof(char) * newsize);
if (!newp)
{
printf("%d: Error no memory (%d)\n",mplrs.rank,
newsize);
/* MPI_Finalize(); */
exit(2);
}
}
*size = newsize;
}
strncat(newp, src, len2);
newp[len1+len2]='\n';
newp[len1+len2+1]='\0';
return newp;
}
/************
* consumer *
************/
int mplrs_consumer(void)
{
int i;
initial_print(); /* print version and other information */
/* initialize MPI_Requests and 3*int buffers for incoming messages */
consumer.prodreq = (MPI_Request*)malloc(sizeof(MPI_Request)*mplrs.size);
consumer.prodibf = (int *)malloc(sizeof(int)*3*mplrs.size);
consumer.num_producers = mplrs.size - 2;
for (i=0; i<mplrs.size; i++)
{
if (i==CONSUMER)
continue;
MPI_Irecv(consumer.prodibf+(i*3), 3, MPI_INT, i, 7,
MPI_COMM_WORLD, consumer.prodreq+i);
}
while (consumer.num_producers>0 || consumer.incoming ||
consumer.waiting_initial)
{
/* check if someone is trying to send us output */
/* if so, queue any incoming messages */
consumer_start_incoming();
/* check for completed message to process */
consumer_proc_messages();
/* check signals */
mplrs_handlesigs();
}
mprintf2(("C: getting stats and exiting\n"));
for (i=0; i<mplrs.size; i++)
{
if (i==CONSUMER || i==MASTER)
continue;
recv_counting_stats(i);
}
free(consumer.prodreq);
free(consumer.prodibf);
consumer.prodreq = NULL; consumer.prodibf = NULL;
if (consumer.checkpoint) /*not really finished, coordinate with master*/
return consumer_checkpoint();
recv_master_stats(); /* gets stats on size of L, etc */
final_print();
MPI_Finalize();
return 0;
}
/* check if anyone is trying to send us their output */
/* if master is trying, probably means we're going to checkpoint */
/* in any case, queue any incoming messages */
void consumer_start_incoming(void)
{
int i;
int flag;
for (i=0; i<mplrs.size; i++)
{
/* don't check ourself and workers that have exited */
if (i==CONSUMER || consumer.prodreq[i] == MPI_REQUEST_NULL)
continue;
MPI_Test(consumer.prodreq+i, &flag, MPI_STATUS_IGNORE);
if (!flag) /* not incoming, check next */
continue;
mprintf3(("C: received message from %d\n",i));
if (i==MASTER && consumer.prodibf[0]==CHECKFLAG)
{ /* start checkpoint */
consumer.checkpoint = 1;
mprintf(("*Checkpointing\n"));
continue;
}
else if (i==MASTER && consumer.prodibf[0]==RESTARTFLAG)
{
/* get counting stats for restart */
recv_counting_stats(MASTER);
consumer.waiting_initial = 0;
mprintf(("C: Restarted\n"));
/* master may restart and later checkpoint */
MPI_Irecv(consumer.prodibf+(i*3), 3, MPI_INT, i, 7,
MPI_COMM_WORLD, consumer.prodreq+i);
continue;
}
if (consumer.prodibf[3*i]<=0 && consumer.prodibf[3*i+1]<=0)
{
/* producer i has finished and will exit */
consumer.num_producers--;
mprintf(("C: Producer %d reported exiting, %d left.\n",
i, consumer.num_producers));
continue;
}
/* otherwise, we have the normal situation -- the producer
* wants to send us some output.
*/
consumer.incoming =
consumer_queue_incoming(consumer.prodibf+3*i, i);
MPI_Irecv(consumer.prodibf+(i*3), 3, MPI_INT, i, 7,
MPI_COMM_WORLD, consumer.prodreq+i);
}
}
/* target has sent us this header. Queue the corresponding receive and
* return the new value of consumer.incoming.
*/
msgbuf *consumer_queue_incoming(int *header, int target)
{
msgbuf *curhead = consumer.incoming;
msgbuf *newmsg = (msgbuf *)malloc(sizeof(msgbuf));
newmsg->req = (MPI_Request *)malloc(sizeof(MPI_Request)*2);
newmsg->buf = (void **)malloc(sizeof(void *));
newmsg->buf[0] = (char *)malloc(sizeof(char)*(header[1]+1));
newmsg->count = 1;
newmsg->target = target;
newmsg->next = curhead;
newmsg->queue = 0;
newmsg->tags = NULL;
newmsg->sizes = NULL;
newmsg->types = NULL;
newmsg->data = header[0]; /* bound for stdout or output file */
/* get my_tag from producer via header[2] to uniquely identify msg */
MPI_Irecv(newmsg->buf[0], header[1]+1, MPI_CHAR, target, header[2],
MPI_COMM_WORLD, newmsg->req);
mprintf3(("C: Receiving from %d (%d,%d,%d)",target,header[0],header[1],
header[2]));
return newmsg;
}
/* check our incoming messages, process and remove anything that
* has completed
*/
void consumer_proc_messages(void)
{
msgbuf *msg, *prev=NULL, *next;
int i,len;
for (msg=consumer.incoming; msg; msg=next)
{
next=msg->next;
if (outgoing_msgbuf_completed(msg))
{
if (consumer.waiting_initial &&
msg->target != INITIAL)
continue;
if (msg->data!=0)
fprintf(consumer.output, "%s",
(char*)msg->buf[0]);
/* we wait on all other output until we've printed
* the initial output containing ...begin\n
* to ensure pretty output, if this is the begin,
* flip the flag.
*/
if (consumer.waiting_initial)
{
len=strlen((char *)msg->buf[0])-5;
for (i=0; i<len; i++)
if (!strncmp("begin\n",
(char*)msg->buf[0]+i,6))
{
mprintf3(("C: found begin\n"));
phase1_print();
consumer.waiting_initial = 0;
break;
}
}
free_msgbuf(msg);
if (prev)
prev->next = next;
else
consumer.incoming = next;
continue;
}
prev=msg;
}
}
/* We are checkpointing instead of a normal exit.
* Send counting stats to master, then exit quietly
*/
int consumer_checkpoint(void)
{
int len;
char *str;
char *vol = cprat("", mplrs.Vnum, mplrs.Vden);
send_counting_stats(MASTER);
MPI_Recv(&len, 1, MPI_INT, MASTER, 1, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
if (len == -1) /* master produces checkpoint file */
{
MPI_Finalize();
return 0;
}
fprintf(consumer.output, "*Checkpoint file follows this line\n");
fprintf(consumer.output, "mplrs3\n%lu %lu %lu %lu %lu\n%s\n",
mplrs.rays, mplrs.vertices, mplrs.bases, mplrs.facets,
mplrs.intvertices,vol);
free(vol);
while (1)
{
MPI_Recv(&len, 1, MPI_INT, MASTER, 1, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
if (len<0)
break;
str = (char*)malloc(sizeof(char)*len);
MPI_Recv(str, len, MPI_CHAR, MASTER, 1, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
fprintf(consumer.output,"%s\n",str);
free(str);
}
fprintf(consumer.output,"*Checkpoint finished above this line\n");
MPI_Finalize();
return 0;
}
/* check whether this outgoing msgbuf has completed.
* If the msg is queued (msg->queue == 1),
* then if the first part has completed, send the remaining parts
* Don't use with *queued* incoming msgbuf.
*/
inline int outgoing_msgbuf_completed(msgbuf *msg)
{
int flag;
int count = msg->count;
int i;
if (msg->queue != 1)
{
MPI_Testall(count, msg->req, &flag, MPI_STATUSES_IGNORE);
return flag;
}
MPI_Test(msg->req, &flag, MPI_STATUS_IGNORE);
if (!flag)
return flag;
/* first completed, send the rest of the queued send */
mprintf3(("%d: Sending second part of queued send to %d\n",
mplrs.rank, msg->target));
for (i=1; i<count; i++)
{
if (msg->buf[i])
MPI_Isend(msg->buf[i], msg->sizes[i], msg->types[i],
msg->target, msg->tags[i], MPI_COMM_WORLD,
msg->req+i);
}
msg->queue = 0;
return 0;
}
inline void free_msgbuf(msgbuf *msg)
{
int i;
for (i=0; i<msg->count; i++)
free(msg->buf[i]);
free(msg->buf);
free(msg->req);
free(msg->tags);
free(msg->sizes);
free(msg->types);
free(msg);
return;
}
outlist *reverse_list(outlist* head)
{
outlist * last = head, * new_head = NULL;
while(last)
{
outlist * tmp = last;
last = last->next;
tmp->next = new_head;
new_head = tmp;
}
return new_head;
}
/* send stats on size of L, etc */
void send_master_stats(void)
{
unsigned long stats[4] = {master.tot_L, master.num_empty, 0, 0};
MPI_Send(stats, 4, MPI_UNSIGNED_LONG, CONSUMER, 1, MPI_COMM_WORLD);
return;
}
/* get master stats on size of L, etc */
void recv_master_stats(void)
{
unsigned long stats[4] = {0,0,0,0};
MPI_Recv(stats, 4, MPI_UNSIGNED_LONG, MASTER, 1, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
master.tot_L = stats[0];
master.num_empty = stats[1];
return;
}
/* send stats to target for final print */
void send_counting_stats(int target)
{
char *vol = NULL;
if (mplrs.facets>0)
vol = cprat("", mplrs.Vnum, mplrs.Vden);
else
{
vol = (char *)malloc(sizeof(char)*2);
vol[0] = '0'; vol[1] = '\0';
}
unsigned long stats[6] = {mplrs.rays, mplrs.vertices, mplrs.bases,
mplrs.facets, mplrs.intvertices,
strlen(vol)+1};
MPI_Send(stats, 6, MPI_UNSIGNED_LONG, target, 1, MPI_COMM_WORLD);
MPI_Send(vol, stats[5], MPI_CHAR, target, 1, MPI_COMM_WORLD);
free(vol);
return;
}
/* gets counting stats from target */
void recv_counting_stats(int target)
{
char *vol;
unsigned long stats[6];
MPI_Recv(stats, 6, MPI_UNSIGNED_LONG, target, 1, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
mplrs.rays+=stats[0];
mplrs.vertices+=stats[1];
mplrs.bases+=stats[2];
mplrs.facets+=stats[3];
mplrs.intvertices+=stats[4];
vol = (char*)malloc(sizeof(char)*stats[5]);
MPI_Recv(vol, stats[5], MPI_CHAR, target, 1, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
plrs_readrat(mplrs.Tnum, mplrs.Tden, vol);
copy(mplrs.tN, mplrs.Vnum); copy(mplrs.tD, mplrs.Vden);
linrat(mplrs.tN, mplrs.tD, 1L, mplrs.Tnum, mplrs.Tden,
1L, mplrs.Vnum, mplrs.Vden);
free(vol);
return;
}
/* do the initial print */
void initial_print(void)
{
fprintf(consumer.output, "*mplrs:%s%s(%s)%d processes\n%s\n",
TITLE,VERSION,ARITH,mplrs.size,AUTHOR);
fprintf(consumer.output, "*Input taken from %s\n",
mplrs.input_filename);
fprintf(consumer.output, "*Starting depth of %d maxcobases=%d ",
master.initdepth, master.maxcobases);
fprintf(consumer.output, "maxdepth=%d lmin=%d lmax=%d scale=%d\n",
master.maxdepth, master.lmin,
master.lmax, master.scalec);
if (mplrs.countonly)
fprintf(consumer.output, "*countonly\n");
if (consumer.output==stdout)
return;
printf("*mplrs:%s%s(%s)%d processes\n%s\n",TITLE,VERSION,ARITH,
mplrs.size,AUTHOR);
printf("*Input taken from %s\n",mplrs.input_filename);
printf("*Output written to: %s\n",consumer.output_filename);
printf("*Starting depth of %d maxcobases=%d ", master.initdepth,
master.maxcobases);
printf("maxdepth=%d lmin=%d lmax=%d scale=%d\n", master.maxdepth,
master.lmin, master.lmax, master.scalec);
if (mplrs.countonly)
printf("*countonly\n");
}
/* do the "*Phase 1 time: " print */
inline void phase1_print(void)
{
timeval cur;
gettimeofday(&cur, NULL);
printf("*Phase 1 time: %ld seconds.\n",cur.tv_sec-mplrs.start.tv_sec);
return;
}
/* do the final print */
void final_print(void)
{
timeval end;
char *vol=NULL;
gettimeofday(&end, NULL);
fprintf(consumer.output, "end\n");
printf("*Total number of jobs: %lu, L became empty %lu times\n", master.tot_L, master.num_empty);
if (mplrs.facets>0)
{
vol = cprat("*Volume=",mplrs.Vnum,mplrs.Vden);
fprintf(consumer.output,"%s\n",vol);
fprintf(consumer.output,"*Totals: facets=%lu bases=%lu\n",
mplrs.facets, mplrs.bases);
}
else
fprintf(consumer.output, "*Totals: vertices=%lu rays=%lu bases=%lu integer-vertices=%lu\n",
mplrs.vertices,mplrs.rays,mplrs.bases,mplrs.intvertices);
fprintf(consumer.output, "*Elapsed time: %ld seconds.\n",
end.tv_sec - mplrs.start.tv_sec);
if (consumer.output_filename == NULL)
return;
if (mplrs.facets>0)
{
printf("%s\n", vol);
printf("*Totals: facets=%lu bases=%lu\n", mplrs.facets,
mplrs.bases);
free(vol);
}
else
printf("*Totals: vertices=%lu rays=%lu bases=%lu integer-vertices=%lu\n",
mplrs.vertices,mplrs.rays,mplrs.bases,mplrs.intvertices);
printf("*Elapsed time: %ld seconds.\n", end.tv_sec - mplrs.start.tv_sec);
}
void post_output(const char *type, const char *data)
{
outlist *out = (outlist *)malloc(sizeof(outlist));
out->type = dupstr(type);
out->data = dupstr(data);
out->next = mplrs.output_list;
mplrs.output_list = out;
if (mplrs.outnum++ > mplrs.maxbuf) /* buffer <maxbuf output lines */
{
process_output(); /* before starting a flush */
clean_outgoing_buffers();
}
}
/* strdup */
inline char *dupstr(const char *str)
{
char *buf = (char *)malloc(sizeof(char)*(strlen(str)+1));
strcpy(buf,str);
return buf;
}