]> infiniteadaptability.org Git - seeder/commitdiff
...
authoralex <[email protected]>
Fri, 22 Oct 2021 01:11:59 +0000 (18:11 -0700)
committeralex <[email protected]>
Fri, 22 Oct 2021 01:11:59 +0000 (18:11 -0700)
src/add.c
src/log.c
src/main.c

index 8f263417f83d65b53874f32df2f2884300ce16da..3beb3df199b68f64b267e1545ef5f644afbed0ca 100644 (file)
--- a/src/add.c
+++ b/src/add.c
@@ -1,6 +1,6 @@
 #include<add.h>
 
-pthread_t adding_thread;
+pthread_t *adding_threads;
 pthread_mutex_t adding_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 //static int setup_adding() {
@@ -13,11 +13,14 @@ pthread_mutex_t adding_mutex = PTHREAD_MUTEX_INITIALIZER;
 //}
 
 static struct hash_map *add_queue;
+static size_t add_queue_index;
 static struct torrent *current_torrent;
 
 static int add_find_all();
+static void *add_hash(void*);
 static int add_queue_resize();
 static int add_to_queue();
+static struct file *get_next();
 static int ftw_helper(const char*,const struct stat*,int);
 
 int add() {
@@ -27,16 +30,55 @@ int add() {
                if(add_find_all(session.torrents[i])<0) { return -1; }
        }
 
-       for(size_t i=0;i<add_queue->size;i++) {
-               if(add_queue->map[i]!=NULL) {
-                       log_info("to add: %s\n",((struct file*)add_queue->map[i])->path);
+       add_queue_index = 0;
+
+       pthread_t *adding_threads = calloc(sizeof(pthread_t),global_options.worker_threads);
+       for(size_t i=0;i<global_options.worker_threads;i++) {
+               if(pthread_create(&(adding_threads[i]),NULL,&add_hash,NULL)!=0) {
+                       perror("pthread_create");
+                       return -1;
                }
        }
 
+       for(size_t i=0;i<global_options.worker_threads;i++) {
+               if(pthread_join(adding_threads[i],NULL)<0) { return -1; }
+       }
+
+       free(adding_threads);
        hashmap_free(add_queue);
+
        return 1;
 }
 
+static struct file *get_next() {
+       struct file *p;
+       while(add_queue_index<add_queue->size) {
+               p = add_queue->map[add_queue_index];
+               add_queue_index++;
+
+               if(p!=NULL) { return p; }
+       }
+
+       return NULL;
+}
+
+static void *add_hash(void *unused) {
+       struct file *p;
+
+       while(1) {
+               pthread_testcancel();
+               pthread_mutex_lock(&adding_mutex);
+               p = get_next();
+               pthread_mutex_unlock(&adding_mutex);
+
+               if(NULL==p) { return NULL; }
+               log_info("to add: %s\n",p->path);
+       }
+
+       return NULL;
+}
+
+
 static int add_find_all(struct torrent *p) {
        log_info(ADD_MESSAGE_ADDING_TORRENT,p->root,p->name);
        current_torrent = p;
@@ -84,11 +126,15 @@ static int add_to_queue(struct file *to_add) {
        size_t len;
 
        len = strlen(to_add->path);
-       p = hashmap_find(add_queue,to_add->path,len);
-       if(p!=NULL) { return 0; }
 
        while((ret = hashmap_insert(add_queue,to_add->path,len,to_add))<=0) {
                if(ret<0) { return -1; }
+               
+               p = hashmap_find(add_queue,to_add->path,len);
+               if(p!=NULL) {
+                       if(strcmp(p->path,to_add->path)==0) { return 1; }
+               }
+
                if(add_queue_resize(add_queue->size<<1)<0) { return -1; }
        }
        
index 59417f71d9f2d315a9dd453cb1805fde5d0f58a0..c6232f76ebef1c4af8ab72680e357bb6c672bde5 100644 (file)
--- a/src/log.c
+++ b/src/log.c
@@ -10,10 +10,9 @@ static void log_print();
 static void log_print_prefix(FILE*);
 
 static struct log_entry *log_dequeue() {
+       // requires logging_mutex to be locked
        struct log_entry *p;
 
-       pthread_mutex_lock(&logging_mutex);
-
        if(NULL==helper.start) {
                p = NULL;
        } else {
@@ -21,8 +20,6 @@ static struct log_entry *log_dequeue() {
                helper.start = p->next;
        }
 
-       pthread_mutex_unlock(&logging_mutex);
-
        return p;
 }
 
@@ -77,11 +74,11 @@ static void log_flush(void *p) {
 }
 
 void log_print() {
+       // requires logging_mutex to be locked
        struct log_entry *p = log_dequeue();
        if(NULL!=p) {
                log_print_prefix(p->out_stream);
                fputs(p->buf,p->out_stream);
-               helper.start = p->next;
        }
 }
 
@@ -149,7 +146,10 @@ void *log_poll(void *p) {
        log_info(LOG_THREAD_START_MESSAGE);
        while(1) {
                pthread_testcancel();
+               
+               pthread_mutex_lock(&logging_mutex);
                log_print();
+               pthread_mutex_unlock(&logging_mutex);
        }
 
        pthread_cleanup_pop(1);
index 6d833bb8e5a50d82605aaa1e95c892eb1f499b24..ded065cfced055dd7fbad7c85ed0d5b86e5cb271 100644 (file)
@@ -8,7 +8,7 @@ int main(int argc, char **argv) {
        if(add()<0) { return EXIT_FAILURE; }
 
        log_err("this is a test %d %s\n",10,"what?");
-       while(1) { }
+       //while(1) { }
 
        return EXIT_FAILURE;
 }