Skip to content

Commit

Permalink
I/O for final files unification added, and minor bugfix for large files
Browse files Browse the repository at this point in the history
  • Loading branch information
Kr4is committed Jul 3, 2020
1 parent 3e32c82 commit 0baba1d
Showing 1 changed file with 62 additions and 59 deletions.
121 changes: 62 additions & 59 deletions run_hmm.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ int main (int argc, char **argv)
char mystring[STRINGLEN] = "";
char filename[STRINGLEN+16] = "";
int *obs_seq_len, *start_sequences, *end_sequences;
long int *seq_start_pointers;
long int seq_pointer, num_start_seq, num_final_seq;
long int *seq_start_pointers, *out_pointers, *aa_pointers, *dna_pointers;
long int seq_pointer, out_seq_pointer, aa_seq_pointer, dna_seq_pointer, final_seq_pointer, num_start_seq, num_final_seq;
int next_proc;
int bp_count; /* count the length of each line in input file */

Expand Down Expand Up @@ -188,11 +188,11 @@ int main (int argc, char **argv)

// Create output temp files for each process
sprintf(filename, "%s.out.%d", out_header, myid);
out = fopen(filename, "w");
out = fopen(filename, "w+");
sprintf(filename, "%s.faa.%d", out_header, myid);
aa = fopen(filename, "w");
aa = fopen(filename, "w+");
sprintf(filename, "%s.ffn.%d", out_header, myid);
dna = fopen(filename, "w");
dna = fopen(filename, "w+");

// Open sequences file
fp = fopen(seq_file, "r");
Expand Down Expand Up @@ -254,7 +254,7 @@ int main (int argc, char **argv)
// Rewind file to de top
rewind(fp);

int seqs_len_per_proc = total_seqs_len / num_procs;
long int seqs_len_per_proc = total_seqs_len / num_procs;
int curr_proc = 0;
long int length_accum = 0;

Expand Down Expand Up @@ -341,66 +341,69 @@ int main (int argc, char **argv)
break;
}
}


// Wait for all processes computation
MPI_Barrier(MPI_COMM_WORLD);

out_pointers = (long int *) malloc(num_procs * sizeof(long int));
seq_pointer = ftell(out);
MPI_Allgather(&seq_pointer, 1, MPI_LONG, out_pointers, 1, MPI_LONG, MPI_COMM_WORLD);
rewind(out);

aa_pointers = (long int *) malloc(num_procs * sizeof(long int));
seq_pointer = ftell(aa);
MPI_Allgather(&seq_pointer, 1, MPI_LONG, aa_pointers, 1, MPI_LONG, MPI_COMM_WORLD);
rewind(aa);

dna_pointers = (long int *) malloc(num_procs * sizeof(long int));
seq_pointer = ftell(dna);
MPI_Allgather(&seq_pointer, 1, MPI_LONG, dna_pointers, 1, MPI_LONG, MPI_COMM_WORLD);
rewind(dna);

out_seq_pointer = 0;
aa_seq_pointer = 0;
dna_seq_pointer = 0;
int current_proc;
for(current_proc = 0; current_proc < myid; current_proc++){
out_seq_pointer += out_pointers[current_proc];
aa_seq_pointer += aa_pointers[current_proc];
dna_seq_pointer += dna_pointers[current_proc];
}

MPI_File final_out, final_aa, final_dna;

sprintf(filename, "%s.out", out_header);
MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &final_out);
MPI_File_seek(final_out, out_seq_pointer, MPI_SEEK_SET);
while(fgets(mystring, sizeof(mystring), out)) {
MPI_File_write(final_out, mystring, strlen(mystring), MPI_CHAR, MPI_STATUS_IGNORE);
}
MPI_File_close(&final_out);

sprintf(filename, "%s.faa", out_header);
MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &final_aa);
MPI_File_seek(final_aa, aa_seq_pointer, MPI_SEEK_SET);
while(fgets(mystring, sizeof(mystring), aa)) {
MPI_File_write(final_aa, mystring, strlen(mystring), MPI_CHAR, MPI_STATUS_IGNORE);
}
MPI_File_close(&final_aa);

sprintf(filename, "%s.ffn", out_header);
MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &final_dna);
MPI_File_seek(final_dna, dna_seq_pointer, MPI_SEEK_SET);
while(fgets(mystring, sizeof(mystring), dna)) {
MPI_File_write(final_dna, mystring, strlen(mystring), MPI_CHAR, MPI_STATUS_IGNORE);
}
MPI_File_close(&final_dna);

// Close files
fclose(out);
fclose(aa);
fclose(dna);

// Wait for all processes computation
// Wait for all processes file write
MPI_Barrier(MPI_COMM_WORLD);

if(myid == 0){
// Create final output files
FILE *current_file;
int current_proc;
// out file
FILE *final_out;
sprintf(filename, "%s.out", out_header);
final_out = fopen(filename, "w");
for(current_proc = 0; current_proc < num_procs; current_proc++){
sprintf(filename, "%s.out.%d", out_header, current_proc);
current_file = fopen(filename, "r");
while(fgets(mystring, sizeof(mystring), current_file)) {
fprintf(final_out, "%s", mystring);
}
fclose(current_file);
// Remove temp file
remove(filename);
}
fclose(final_out);
// aa file
FILE *final_aa;
sprintf(filename, "%s.faa", out_header);
final_aa = fopen(filename, "w");
for(current_proc = 0; current_proc < num_procs; current_proc++){
sprintf(filename, "%s.faa.%d", out_header, current_proc);
current_file = fopen(filename, "r");
while(fgets(mystring, sizeof(mystring), current_file)) {
fprintf(final_aa, "%s", mystring);
}
fclose(current_file);
// Remove temp file
remove(filename);
}
fclose(final_aa);
// dna file
FILE *final_dna;
sprintf(filename, "%s.ffn", out_header);
final_dna = fopen(filename, "w");
for(current_proc = 0; current_proc < num_procs; current_proc++){
sprintf(filename, "%s.ffn.%d", out_header, current_proc);
current_file = fopen(filename, "r");
while(fgets(mystring, sizeof(mystring), current_file)) {
fprintf(final_dna, "%s", mystring);
}
fclose(current_file);
// Remove temp file
remove(filename);
}
fclose(final_dna);
}

clock_t end = clock();
if(myid == 0){
printf("Clock time used (by %d processes) = %.2f mins\n", num_procs, (end - start) / (60.0 * CLOCKS_PER_SEC));
Expand Down

0 comments on commit 0baba1d

Please sign in to comment.