From: alex Date: Fri, 22 Oct 2021 01:11:59 +0000 (-0700) Subject: ... X-Git-Url: http://git.infiniteadaptability.org/?a=commitdiff_plain;h=7b8f2cf1aadc8317b027ac983b4b6699511d62d1;p=seeder ... --- diff --git a/src/add.c b/src/add.c index 8f26341..3beb3df 100644 --- a/src/add.c +++ b/src/add.c @@ -1,6 +1,6 @@ #include -pthread_t adding_thread; +pthread_t *adding_threads; pthread_mutex_t adding_mutex = PTHREAD_MUTEX_INITIALIZER; //static int setup_adding() { @@ -13,11 +13,14 @@ pthread_mutex_t adding_mutex = PTHREAD_MUTEX_INITIALIZER; //} static struct hash_map *add_queue; +static size_t add_queue_index; static struct torrent *current_torrent; static int add_find_all(); +static void *add_hash(void*); static int add_queue_resize(); static int add_to_queue(); +static struct file *get_next(); static int ftw_helper(const char*,const struct stat*,int); int add() { @@ -27,16 +30,55 @@ int add() { if(add_find_all(session.torrents[i])<0) { return -1; } } - for(size_t i=0;isize;i++) { - if(add_queue->map[i]!=NULL) { - log_info("to add: %s\n",((struct file*)add_queue->map[i])->path); + add_queue_index = 0; + + pthread_t *adding_threads = calloc(sizeof(pthread_t),global_options.worker_threads); + for(size_t i=0;isize) { + p = add_queue->map[add_queue_index]; + add_queue_index++; + + if(p!=NULL) { return p; } + } + + return NULL; +} + +static void *add_hash(void *unused) { + struct file *p; + + while(1) { + pthread_testcancel(); + pthread_mutex_lock(&adding_mutex); + p = get_next(); + pthread_mutex_unlock(&adding_mutex); + + if(NULL==p) { return NULL; } + log_info("to add: %s\n",p->path); + } + + return NULL; +} + + static int add_find_all(struct torrent *p) { log_info(ADD_MESSAGE_ADDING_TORRENT,p->root,p->name); current_torrent = p; @@ -84,11 +126,15 @@ static int add_to_queue(struct file *to_add) { size_t len; len = strlen(to_add->path); - p = hashmap_find(add_queue,to_add->path,len); - if(p!=NULL) { return 0; } while((ret = hashmap_insert(add_queue,to_add->path,len,to_add))<=0) { if(ret<0) { return -1; } + + p = hashmap_find(add_queue,to_add->path,len); + if(p!=NULL) { + if(strcmp(p->path,to_add->path)==0) { return 1; } + } + if(add_queue_resize(add_queue->size<<1)<0) { return -1; } } diff --git a/src/log.c b/src/log.c index 59417f7..c6232f7 100644 --- a/src/log.c +++ b/src/log.c @@ -10,10 +10,9 @@ static void log_print(); static void log_print_prefix(FILE*); static struct log_entry *log_dequeue() { + // requires logging_mutex to be locked struct log_entry *p; - pthread_mutex_lock(&logging_mutex); - if(NULL==helper.start) { p = NULL; } else { @@ -21,8 +20,6 @@ static struct log_entry *log_dequeue() { helper.start = p->next; } - pthread_mutex_unlock(&logging_mutex); - return p; } @@ -77,11 +74,11 @@ static void log_flush(void *p) { } void log_print() { + // requires logging_mutex to be locked struct log_entry *p = log_dequeue(); if(NULL!=p) { log_print_prefix(p->out_stream); fputs(p->buf,p->out_stream); - helper.start = p->next; } } @@ -149,7 +146,10 @@ void *log_poll(void *p) { log_info(LOG_THREAD_START_MESSAGE); while(1) { pthread_testcancel(); + + pthread_mutex_lock(&logging_mutex); log_print(); + pthread_mutex_unlock(&logging_mutex); } pthread_cleanup_pop(1); diff --git a/src/main.c b/src/main.c index 6d833bb..ded065c 100644 --- a/src/main.c +++ b/src/main.c @@ -8,7 +8,7 @@ int main(int argc, char **argv) { if(add()<0) { return EXIT_FAILURE; } log_err("this is a test %d %s\n",10,"what?"); - while(1) { } + //while(1) { } return EXIT_FAILURE; }