#define ADD_MESSAGE_ADDING_TORRENT "adding all files in %s to torrent named %s\n"
#define ADD_MESSAGE_ADDED_FILE "added file: %s\n"
#define ADD_MESSAGE_HASH_FILE_FAILED "failed to hash file: %s\n"
+#define ADD_MESSAGE_SHUTDOWN_FAILED "adding threads shutdown failed\n"
#define ADD_QUEUE_INITIAL_SIZE 8
};
void hashmap_clear(struct hash_map*);
-void *hashmap_find(struct hash_map*,const void*,size_t);
+void *hashmap_find(const struct hash_map*,const void*,size_t);
void hashmap_free(struct hash_map*);
int hashmap_init(struct hash_map**,size_t);
int hashmap_insert(struct hash_map*,const void*,size_t,void*);
#define WATCH_MESSAGE_FAILED "failed to start watching\n"
#define WATCH_MESSAGE_START "watching started: %s\n"
#define WATCH_MESSAGE_START_FAILED "failed to start watching %s\n"
+#define WATCH_MESSAGE_SHUTDOWN_FAILED "failed to shutdown watching thread\n"
int watch();
#include<add.h>
static pthread_mutex_t adding_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_t *adding_threads;
static struct hash_map *add_queue;
static size_t add_queue_index;
static void add_queue_entry_free(struct add_queue_entry*);
static int add_queue_entry_init(struct add_queue_entry**);
static int add_queue_resize();
+static void add_shutdown();
static int add_to_queue(struct add_queue_entry*);
static struct add_queue_entry *get_next();
static int ftw_helper(const char*,const struct stat*,int);
int add() {
struct torrent *p;
+ adding_threads = NULL;
+
+ if(0!=atexit(&add_shutdown)) {
+ perror("atexit");
+ return -1;
+ }
+
if(hashmap_init(&add_queue,ADD_QUEUE_INITIAL_SIZE)<0) { return -1; }
for(size_t i=0;i<session.torrents.infohashes->size;i++) {
add_queue_index = 0;
- pthread_t *adding_threads = calloc(sizeof(pthread_t),global_options.worker_threads);
+ adding_threads = calloc(sizeof(pthread_t),global_options.worker_threads);
for(size_t i=0;i<global_options.worker_threads;i++) {
if(pthread_create(&(adding_threads[i]),NULL,&add_hash,NULL)!=0) {
perror("pthread_create");
}
}
- for(size_t i=0;i<global_options.worker_threads;i++) {
- if(pthread_join(adding_threads[i],NULL)<0) { return -1; }
- }
-
- free(adding_threads);
- hashmap_free(add_queue);
+ add_shutdown();
return 1;
}
return 1;
}
+static void add_shutdown() {
+ log_info("shutting down adding threads\n"); // REMOVE
+ if(adding_threads!=NULL) {
+ for(size_t i=0;i<global_options.worker_threads;i++) {
+ if(pthread_join(adding_threads[i],NULL)<0) {
+ log_err(ADD_MESSAGE_SHUTDOWN_FAILED);
+ return;
+ }
+ }
+
+ free(adding_threads);
+ hashmap_free(add_queue);
+
+ adding_threads = NULL;
+ }
+
+ log_info("shut down of add threads complete\n"); // REMOVE
+}
+
static int add_to_queue(struct add_queue_entry *to_add) {
struct add_queue_entry *p;
int ret;
}
}
-void *hashmap_find(struct hash_map *p, const void *key, size_t key_size) {
+void *hashmap_find(const struct hash_map *p, const void *key, size_t key_size) {
unsigned char hash[crypto_shorthash_BYTES];
size_t index;
int init(int argc, char **argv) {
int c;
+ /*
+ * Main Initialization:
+ * Order matters here
+ * 1. signal handler
+ * 2. defaults so base state is consistent
+ * 3. logging_setup() so debugging information in later steps
+ * 4. session_setup() so that watch directories can be added to session successfully.
+ */
if(signal_handler()<0) { return -1; }
-
+ if(defaults()<0) { return -1; }
+ if(logging_setup()<0) { return -1; }
if(session_setup()<0) { return -1; }
- if(defaults()<0) { return -1; }
if(opt_load_from_env()<0) { return -1; }
- if(logging_setup()<0) { return -1; }
-
while(1) {
int option_index = 0;
static void log_shutdown() {
int ret_cancel, ret_join;
void *res;
+
+ log_info("shutting down logging thread\n");
ret_cancel = pthread_cancel(logging_thread);
ret_join = pthread_join(logging_thread,&res);
+
+ log_info("shutting down logging thread 2\n");
logging_thread = pthread_self();
if((ret_cancel!=0)||(ret_join!=0)||(res!=PTHREAD_CANCELED)) {
return -1;
}
+ log_info("logging started\n");
+
return 1;
}
static void net_shutdown(void *p) {
struct net_info *info = (struct net_info*)p;
+ close(info->epoll_fd);
close(info->tcp_socket);
close(info->udp_socket);
}
static void net_stop() {
+ log_info("net thread stopping\n");
for(size_t i=0;i<global_options.worker_threads;i++) {
pthread_cancel(net_threads[i]);
}
log_err(NET_MESSAGE_SHUTDOWN_FAILED);
}
}
+ log_info("net thread stopped\n");
}
static void session_clean() {
struct torrent *p;
+ log_info("clearing session\n");
+
for(size_t i=0;i<session.torrents.paths->size;i++) {
p = session.torrents.paths->map[i];
if(p!=NULL) { torrent_free(p); }
hashmap_clear(session.torrents.paths);
hashmap_free(session.torrents.paths);
+
+ log_info("session cleared\n");
}
struct torrent *session_find_torrent(uint8_t *infohash, size_t size) {
return -1;
}
+ log_info("session setup\n");
+
return 1;
}
* call exit explicitly will call functions registered
* with atexit and do appropriate cleanup.
*/
- exit(EXIT_SUCCESS);
+ exit(EXIT_FAILURE);
}
static void shutdown_success() {
#include<watch.h>
+static int watch_fd;
static pthread_t watching_thread;
+static void watch_shutdown();
static void *watch_spawn(void*);
-static int watch_spawn_all(int,const char*,struct tree*);
-static void watch_poll(int);
+static int watch_spawn_all(const char*,struct tree*);
+static void watch_poll();
int watch() {
+ if(0!=atexit(&watch_shutdown)) {
+ perror("atexit");
+ return -1;
+ }
+
if(pthread_create(&watching_thread,NULL,&watch_spawn,NULL)!=0) {
perror("pthread_create");
return -1;
static void *watch_spawn(void *unused) {
struct torrent *p;
- int fd;
- fd = inotify_init();
- if(fd<0) {
+ watch_fd = inotify_init();
+ if(watch_fd<0) {
log_err(WATCH_MESSAGE_FAILED);
perror("inotify_init");
return NULL;
for(size_t i=0;i<session.torrents.infohashes->size;i++) {
p = session.torrents.infohashes->map[i];
if(p!=NULL) {
- if(watch_spawn_all(fd,p->root,p->tree)<0) { return NULL; }
+ if(watch_spawn_all(p->root,p->tree)<0) { return NULL; }
}
}
- watch_poll(fd);
+ watch_poll();
return NULL;
}
#define WATCH_FLAGS IN_CLOSE_WRITE | IN_CREATE | IN_DELETE | IN_MODIFY | IN_MOVED_TO
-static int watch_spawn_all(int fd, const char *root, struct tree *tree) {
+static int watch_spawn_all(const char *root, struct tree *tree) {
struct tree_entry *p;
- if(fd<0) { return -1; }
if(NULL==root) { return -1; }
if(strlen(root)<=0) { return -1; }
if(NULL==tree) { return -1; }
- tree->watch_fd = inotify_add_watch(fd,root,WATCH_FLAGS);
+ tree->watch_fd = inotify_add_watch(watch_fd,root,WATCH_FLAGS);
if(tree->watch_fd<0) {
perror("inotify_add_watch");
log_err(WATCH_MESSAGE_START_FAILED,root);
while(p!=NULL) {
if(p->children!=NULL) {
char *newroot = concat(root,p->name);
- if(watch_spawn_all(fd,newroot,p->children)<0) { return -1; }
+ if(watch_spawn_all(newroot,p->children)<0) { return -1; }
free(newroot);
}
p = p->next;
return 1;
}
-static void watch_poll(int fd) {
+static void watch_poll() {
/* see `man inotify` for explanation of alignment modifiers */
char buf[4096] __attribute__ ((aligned(__alignof__(struct inotify_event))));
const struct inotify_event *event;
ssize_t len;
for(;;) {
- len = read(fd,buf,sizeof(buf));
+ pthread_testcancel();
+
+ len = read(watch_fd,buf,sizeof(buf));
if(len<=0) {
perror("read");
break;
}
}
}
+
+static void watch_shutdown() {
+ log_info("watch thread stopping\n");
+ if(0!=pthread_cancel(watching_thread)) {
+ log_err(WATCH_MESSAGE_SHUTDOWN_FAILED);
+ }
+ close(watch_fd);
+ log_info("watch thread stopped\n");
+}
void run_and_exit_successfully(char *const opts[]) {
int status;
+ printf("running\n");
pid_t child_pid = run(opts);
// wait 5 seconds for startup
+ printf("sleeping\n");
sleep(5);
// verify still running
+ printf("verify process running\n");
assert(0==waitpid(child_pid,&status,WNOHANG));
// SIGINT to shutdown
+ printf("send SIGINT\n");
assert(0==kill(child_pid,SIGINT));
// verify child process exited successfully
+ printf("waiting for child process to exit\n");
assert(child_pid==waitpid(child_pid,&status,0));
+ printf("checking exit status\n");
assert(WIFEXITED(status));
+ printf("success!\n");
}
void setup_env() {