#include<add.h>
-pthread_t adding_thread;
+pthread_t *adding_threads;
pthread_mutex_t adding_mutex = PTHREAD_MUTEX_INITIALIZER;
//static int setup_adding() {
//}
static struct hash_map *add_queue;
+static size_t add_queue_index;
static struct torrent *current_torrent;
static int add_find_all();
+static void *add_hash(void*);
static int add_queue_resize();
static int add_to_queue();
+static struct file *get_next();
static int ftw_helper(const char*,const struct stat*,int);
int add() {
if(add_find_all(session.torrents[i])<0) { return -1; }
}
- for(size_t i=0;i<add_queue->size;i++) {
- if(add_queue->map[i]!=NULL) {
- log_info("to add: %s\n",((struct file*)add_queue->map[i])->path);
+ add_queue_index = 0;
+
+ pthread_t *adding_threads = calloc(sizeof(pthread_t),global_options.worker_threads);
+ for(size_t i=0;i<global_options.worker_threads;i++) {
+ if(pthread_create(&(adding_threads[i]),NULL,&add_hash,NULL)!=0) {
+ perror("pthread_create");
+ return -1;
}
}
+ for(size_t i=0;i<global_options.worker_threads;i++) {
+ if(pthread_join(adding_threads[i],NULL)<0) { return -1; }
+ }
+
+ free(adding_threads);
hashmap_free(add_queue);
+
return 1;
}
+static struct file *get_next() {
+ struct file *p;
+ while(add_queue_index<add_queue->size) {
+ p = add_queue->map[add_queue_index];
+ add_queue_index++;
+
+ if(p!=NULL) { return p; }
+ }
+
+ return NULL;
+}
+
+static void *add_hash(void *unused) {
+ struct file *p;
+
+ while(1) {
+ pthread_testcancel();
+ pthread_mutex_lock(&adding_mutex);
+ p = get_next();
+ pthread_mutex_unlock(&adding_mutex);
+
+ if(NULL==p) { return NULL; }
+ log_info("to add: %s\n",p->path);
+ }
+
+ return NULL;
+}
+
+
static int add_find_all(struct torrent *p) {
log_info(ADD_MESSAGE_ADDING_TORRENT,p->root,p->name);
current_torrent = p;
size_t len;
len = strlen(to_add->path);
- p = hashmap_find(add_queue,to_add->path,len);
- if(p!=NULL) { return 0; }
while((ret = hashmap_insert(add_queue,to_add->path,len,to_add))<=0) {
if(ret<0) { return -1; }
+
+ p = hashmap_find(add_queue,to_add->path,len);
+ if(p!=NULL) {
+ if(strcmp(p->path,to_add->path)==0) { return 1; }
+ }
+
if(add_queue_resize(add_queue->size<<1)<0) { return -1; }
}
static void log_print_prefix(FILE*);
static struct log_entry *log_dequeue() {
+ // requires logging_mutex to be locked
struct log_entry *p;
- pthread_mutex_lock(&logging_mutex);
-
if(NULL==helper.start) {
p = NULL;
} else {
helper.start = p->next;
}
- pthread_mutex_unlock(&logging_mutex);
-
return p;
}
}
void log_print() {
+ // requires logging_mutex to be locked
struct log_entry *p = log_dequeue();
if(NULL!=p) {
log_print_prefix(p->out_stream);
fputs(p->buf,p->out_stream);
- helper.start = p->next;
}
}
log_info(LOG_THREAD_START_MESSAGE);
while(1) {
pthread_testcancel();
+
+ pthread_mutex_lock(&logging_mutex);
log_print();
+ pthread_mutex_unlock(&logging_mutex);
}
pthread_cleanup_pop(1);