#include<add.h>
 
-pthread_t adding_thread;
+pthread_t *adding_threads;
 pthread_mutex_t adding_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 //static int setup_adding() {
 //}
 
 static struct hash_map *add_queue;
+static size_t add_queue_index;
 static struct torrent *current_torrent;
 
 static int add_find_all();
+static void *add_hash(void*);
 static int add_queue_resize();
 static int add_to_queue();
+static struct file *get_next();
 static int ftw_helper(const char*,const struct stat*,int);
 
 int add() {
                if(add_find_all(session.torrents[i])<0) { return -1; }
        }
 
-       for(size_t i=0;i<add_queue->size;i++) {
-               if(add_queue->map[i]!=NULL) {
-                       log_info("to add: %s\n",((struct file*)add_queue->map[i])->path);
+       add_queue_index = 0;
+
+       pthread_t *adding_threads = calloc(sizeof(pthread_t),global_options.worker_threads);
+       for(size_t i=0;i<global_options.worker_threads;i++) {
+               if(pthread_create(&(adding_threads[i]),NULL,&add_hash,NULL)!=0) {
+                       perror("pthread_create");
+                       return -1;
                }
        }
 
+       for(size_t i=0;i<global_options.worker_threads;i++) {
+               if(pthread_join(adding_threads[i],NULL)<0) { return -1; }
+       }
+
+       free(adding_threads);
        hashmap_free(add_queue);
+
        return 1;
 }
 
+static struct file *get_next() {
+       struct file *p;
+       while(add_queue_index<add_queue->size) {
+               p = add_queue->map[add_queue_index];
+               add_queue_index++;
+
+               if(p!=NULL) { return p; }
+       }
+
+       return NULL;
+}
+
+static void *add_hash(void *unused) {
+       struct file *p;
+
+       while(1) {
+               pthread_testcancel();
+               pthread_mutex_lock(&adding_mutex);
+               p = get_next();
+               pthread_mutex_unlock(&adding_mutex);
+
+               if(NULL==p) { return NULL; }
+               log_info("to add: %s\n",p->path);
+       }
+
+       return NULL;
+}
+
+
 static int add_find_all(struct torrent *p) {
        log_info(ADD_MESSAGE_ADDING_TORRENT,p->root,p->name);
        current_torrent = p;
        size_t len;
 
        len = strlen(to_add->path);
-       p = hashmap_find(add_queue,to_add->path,len);
-       if(p!=NULL) { return 0; }
 
        while((ret = hashmap_insert(add_queue,to_add->path,len,to_add))<=0) {
                if(ret<0) { return -1; }
+               
+               p = hashmap_find(add_queue,to_add->path,len);
+               if(p!=NULL) {
+                       if(strcmp(p->path,to_add->path)==0) { return 1; }
+               }
+
                if(add_queue_resize(add_queue->size<<1)<0) { return -1; }
        }
        
 
 static void log_print_prefix(FILE*);
 
 static struct log_entry *log_dequeue() {
+       // requires logging_mutex to be locked
        struct log_entry *p;
 
-       pthread_mutex_lock(&logging_mutex);
-
        if(NULL==helper.start) {
                p = NULL;
        } else {
                helper.start = p->next;
        }
 
-       pthread_mutex_unlock(&logging_mutex);
-
        return p;
 }
 
 }
 
 void log_print() {
+       // requires logging_mutex to be locked
        struct log_entry *p = log_dequeue();
        if(NULL!=p) {
                log_print_prefix(p->out_stream);
                fputs(p->buf,p->out_stream);
-               helper.start = p->next;
        }
 }
 
        log_info(LOG_THREAD_START_MESSAGE);
        while(1) {
                pthread_testcancel();
+               
+               pthread_mutex_lock(&logging_mutex);
                log_print();
+               pthread_mutex_unlock(&logging_mutex);
        }
 
        pthread_cleanup_pop(1);