-
Notifications
You must be signed in to change notification settings - Fork 0
/
program.c
266 lines (229 loc) · 8.57 KB
/
program.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
#include <semaphore.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <dirent.h>
#include <errno.h>
const char* USAGESTR = "USAGE: ./myprogram -d <directoryName> -n <#ofthreads>\n";
const char* ERRORSTR = "ERROR: Invalid arguments.\n";
typedef struct data{
// unique word to enter the array
char* uniqueword;
// files that contains this unique word
char** files;
// this variable will help us to keep track of the files[][]
int touchcount;
}data;
// words[] will hold data elements and will be filled by multiple threads
data* words;
int emptywordsindex =0;
// initial words[] size is consist of 8 elements.
int arraysize = 8;
// which folder is this program working on? this variable will be prepended to file names in each thread
char* folderName;
// all the txt file names in the directory (will act as a task queue)
char **textfiles;
// this counter will count the amount of txt files are found and will decrement when a txt file is obtained by a thread
int txtCounter = 0;
// holder for printing purposes
int totalFileAmount =0;
// this mutex will help us to distribute txt files to threads
pthread_mutex_t txtfilemutex= PTHREAD_MUTEX_INITIALIZER;
// this mutex will help us to write to words[] for each thread
pthread_mutex_t wordsarraymutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t filenamemutex = PTHREAD_MUTEX_INITIALIZER;
// isExist function returns index of the word if its exists on the array, if not returns -1
int isExist(char* word){
for(int i =0; words[i].uniqueword != NULL ; i++){
if(strcmp(words[i].uniqueword ,word) == 0){
return i;
}
}
return -1;
}
void processfile(char* filename){
// tokenize the file (word by word)
// add unique elements to the data array
// if array size is not enough; reallocate it with double of its current size
// notify the user about which file is handled by which thread
fprintf(stdout,"MAIN THREAD: Assigned \"%s\" to worker thread %ld.\n",filename,pthread_self());
// buffer that contains file's content
char *buffer ;
long content_length;
char* filePath = malloc(strlen(folderName)*sizeof(char*) +strlen(filename)*sizeof(char*)+1 );
snprintf(filePath,strlen(folderName)*sizeof(char*) +strlen(filename)*sizeof(char*) ,"%s/%s",folderName,filename);
// doing the tokenizations and assigning unique words
FILE* fp = fopen(filePath,"rb");
free(filePath);
if(fp == NULL){
perror(NULL);
exit(EXIT_FAILURE);
}
fseek(fp,0,SEEK_END);
// get the content length
content_length =ftell(fp);
fseek(fp,0,SEEK_SET);
buffer = malloc(content_length +1); // +1 for null termination
if (buffer){
// succesfully allocated
fread(buffer,1,content_length,fp);
// null terminate the buffer
buffer[content_length] = '\0';
}
fclose(fp);
if(buffer){
// savepointer for strtok_r
char *savepointer;
char *token;
int index =0 ;
for(token = strtok_r(buffer," \n",&savepointer) ; token != NULL ; token = strtok_r(NULL," \n",&savepointer)){
// for each token ,check if its exists on the global array
index = isExist(token);
if(index != -1){
// the token already exists in the array
// TODO: add file name to the data's file arary
fprintf(stdout,"The word \"%s\" has already located at index %d.\n",token,index);
pthread_mutex_lock(&filenamemutex);
words[index].files[words[index].touchcount] = malloc((strlen(filename)+1) * sizeof (char*));
words[index].files[words[index].touchcount][strlen(filename)] = '\0';
strcpy(words[index].files[words[index].touchcount], filename);
words[index].touchcount++;
pthread_mutex_unlock(&filenamemutex);
continue;
}else{
pthread_mutex_lock(&wordsarraymutex);
// check if the global array is full
if(emptywordsindex == arraysize){
// if so then reallocate the array's size
arraysize *= 2;
words = realloc(words,arraysize*(sizeof(data)));
if(words == NULL){
perror("error while re-allocating the words array");
exit(EXIT_FAILURE);
}
fprintf(stdout,"THREAD %ld: Re-allocated array of %d pointers.\n",pthread_self(),arraysize);
}
words[emptywordsindex].files = calloc(totalFileAmount,sizeof(char*));
words[emptywordsindex].files[words[emptywordsindex].touchcount] = calloc((strlen(filename)+1) , sizeof (char*));
words[emptywordsindex].files[words[emptywordsindex].touchcount][strlen(filename)] = '\0';
strcpy(words[emptywordsindex].files[words[emptywordsindex].touchcount], filename);
words[emptywordsindex].touchcount++;
words[emptywordsindex].uniqueword = malloc((strlen(token) + 1 )*sizeof(char*));
words[emptywordsindex].uniqueword[strlen(token)] = '\0';
strcpy(words[emptywordsindex].uniqueword, token);
printf("THREAD %ld: Added the word \"%s\" at index %d.\n",pthread_self(),token,emptywordsindex);
emptywordsindex++;
pthread_mutex_unlock(&wordsarraymutex);
}
}
}
free(buffer);
}
// this function will wait and get task, it also waits for a thread to complete and assign it with new file if there is any
void* threadroutine(void* args){
while(txtCounter > 0){
char* filename;
pthread_mutex_lock(&txtfilemutex);
if(txtCounter == 0){
return NULL;
}
filename = malloc(100*sizeof(char*));
strcpy(filename,textfiles[0]);
int i;
for(i = 0 ; i< txtCounter -1 ; i++ ){
strcpy(textfiles[i],textfiles[i+1]);
}
txtCounter--;
pthread_mutex_unlock(&txtfilemutex);
processfile(filename);
}
fflush(stdout);
return NULL;
}
int main(int argc, char *argv[]){
// check if argument amount is correct
if(argc != 5){
fprintf(stderr,"%s",USAGESTR);
return 1;
}
// check if thread amount is positive or not
// first convert 5th argument to integer
// also check if flags are in position
int threadAmount;
threadAmount = atoi(argv[4]);
if(threadAmount < 1 || (strcmp(argv[1],"-d") != 0) || (strcmp(argv[3],"-n")) != 0){
fprintf(stderr,"%s",ERRORSTR);
return 1;
}
// Open the directory
DIR* dir = opendir(argv[2]);
if(!dir){
perror(NULL);
return 1;
}
// directory exists
folderName = malloc((strlen(argv[2])+1)*(sizeof(char*)));
strcpy(folderName,argv[2]);
// null terminate the folder name
folderName[strlen(argv[2])] = '\0';
// this struct will hold directory entries
struct dirent *ent;
// traverse the directory
while((ent = readdir(dir)) != NULL){
// check if the file has correct extension (.txt)
if(strcmp(ent->d_name + strlen(ent->d_name)-4 , ".txt") == 0){
// a txt file found
txtCounter++;
}
}
// check if there is no txt file under the dir
if(txtCounter == 0){
fprintf(stderr,"There is no txt file under %s\n",folderName);
return 1;
}
// holder for printing purposes
totalFileAmount = txtCounter;
// rewind the directory to iterate it again
rewinddir(dir);
textfiles = malloc(txtCounter*sizeof(char*));
int i =0;
while((ent = readdir(dir)) != NULL){
if(strcmp(ent->d_name + strlen(ent->d_name)-4 , ".txt") == 0){
textfiles[i] = malloc((strlen(ent->d_name)+1)*sizeof(char*));
textfiles[i][strlen(ent->d_name)] = '\0';
strcpy(textfiles[i],ent->d_name);
i++;
}
}
// allocate inital memory for the words array
// make the words array null initialized
words =(data*) calloc(arraysize,(sizeof(data)));
fprintf(stdout,"MAIN THREAD: Allocated initial array of 8 pointers.\n");
// create array of threads
pthread_t workers[threadAmount];
for(i = 0 ; i<threadAmount; i++){
// creating joinable thread because we will wait for them to finish
if(pthread_create(&workers[i],NULL,&threadroutine,NULL) != 0){
// error occured while creating the thread
perror("error while creating thread");
return 1;
}
}
// wait for threads to complete
for(i = 0 ; i<threadAmount; i++){
// creating joinable thread because we will wait for them to finish
if(pthread_join(workers[i],NULL) != 0){
// error occured while joining the thread
perror("error while joining thread");
return 1;
}
}
// program has come to the end print the message
fprintf(stdout,"MAIN THREAD: All done (succesfully read %d words with %d threads from %d files).\n",emptywordsindex,atoi(argv[4]),totalFileAmount);
// free allocated spaces
// actually there is no need to free them since they are global variables and handled by the program itself, but I do it for just in case
free(textfiles);
free(folderName);
free(words);
}