From bfd6b2e88e66e785785525bb7bcfc8eca84336cf Mon Sep 17 00:00:00 2001 From: Enno Boland Date: Mon, 2 Jul 2018 13:18:17 +0200 Subject: [PATCH] lib/plist.c: share workload more efficient between threads Before this patch xbps had a constant slice count for every thread. This resulted in unbalanced thread usage if the workload was homogenous over the slices. This patch replaces the naive slice count approach by spinlock based reservation. --- lib/plist.c | 80 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 26 deletions(-) diff --git a/lib/plist.c b/lib/plist.c index 0637e9f0..b1a5cd79 100644 --- a/lib/plist.c +++ b/lib/plist.c @@ -38,7 +38,10 @@ struct thread_data { xbps_dictionary_t dict; struct xbps_handle *xhp; unsigned int start; - unsigned int end; + unsigned int arraycount; + unsigned int *reserved; + pthread_spinlock_t *reserved_lock; + unsigned int slicecount; int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *); void *fn_arg; }; @@ -59,23 +62,33 @@ array_foreach_thread(void *arg) const char *key; int rv; bool loop_done = false; + unsigned i = thd->start; + unsigned int end = i + thd->slicecount; - /* process pkgs from start until end */ - for (unsigned int i = thd->start; i < thd->end; i++) { - obj = xbps_array_get(thd->array, i); - if (xbps_object_type(thd->dict) == XBPS_TYPE_DICTIONARY) { - pkgd = xbps_dictionary_get_keysym(thd->dict, obj); - key = xbps_dictionary_keysym_cstring_nocopy(obj); - /* ignore internal objs */ - if (strncmp(key, "_XBPS_", 6) == 0) - continue; - } else { - pkgd = obj; - key = NULL; + while(i < thd->arraycount) { + /* process pkgs from start until end */ + for (; i < end && i < thd->arraycount; i++) { + obj = xbps_array_get(thd->array, i); + if (xbps_object_type(thd->dict) == XBPS_TYPE_DICTIONARY) { + pkgd = xbps_dictionary_get_keysym(thd->dict, obj); + key = xbps_dictionary_keysym_cstring_nocopy(obj); + /* ignore internal objs */ + if (strncmp(key, "_XBPS_", 6) == 0) + continue; + } else { + pkgd = obj; + key = NULL; + } + rv = (*thd->fn)(thd->xhp, pkgd, key, thd->fn_arg, &loop_done); + if (rv != 0 || loop_done) + return NULL; } - rv = (*thd->fn)(thd->xhp, pkgd, key, thd->fn_arg, &loop_done); - if (rv != 0 || loop_done) - break; + /* Reserve more elements to compute */ + pthread_spin_lock(thd->reserved_lock); + i = *thd->reserved; + end = i + thd->slicecount; + *thd->reserved = end; + pthread_spin_unlock(thd->reserved_lock); } return NULL; } @@ -88,8 +101,10 @@ xbps_array_foreach_cb_multi(struct xbps_handle *xhp, void *arg) { struct thread_data *thd; - unsigned int arraycount, slicecount, pkgcount; + unsigned int arraycount, slicecount; int rv = 0, maxthreads; + unsigned int reserved = 0; + pthread_spinlock_t reserved_lock; assert(fn != NULL); @@ -101,13 +116,25 @@ xbps_array_foreach_cb_multi(struct xbps_handle *xhp, return 0; maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN); - if (maxthreads <= 0 || maxthreads == 1) /* use single threaded routine */ + if (maxthreads <= 1 || arraycount <= 1) /* use single threaded routine */ return xbps_array_foreach_cb(xhp, array, dict, fn, arg); + if (pthread_spin_init(&reserved_lock, PTHREAD_PROCESS_PRIVATE) != 0) + return 0; + thd = calloc(maxthreads, sizeof(*thd)); assert(thd); - slicecount = arraycount / maxthreads; - pkgcount = 0; + + // maxthread is boundchecked to be > 1 + if((unsigned int)maxthreads >= arraycount) { + maxthreads = arraycount; + slicecount = 1; + } else { + slicecount = arraycount / maxthreads; + if (slicecount > 32) { + slicecount = 32; + } + } for (int i = 0; i < maxthreads; i++) { thd[i].array = array; @@ -115,20 +142,21 @@ xbps_array_foreach_cb_multi(struct xbps_handle *xhp, thd[i].xhp = xhp; thd[i].fn = fn; thd[i].fn_arg = arg; - thd[i].start = pkgcount; - if (i + 1 >= maxthreads) - thd[i].end = arraycount; - else - thd[i].end = pkgcount + slicecount; + thd[i].start = i * slicecount; + thd[i].reserved = &reserved; + thd[i].reserved_lock = &reserved_lock; + thd[i].slicecount = slicecount; + thd[i].arraycount = arraycount; + pthread_create(&thd[i].thread, NULL, array_foreach_thread, &thd[i]); - pkgcount += slicecount; } /* wait for all threads */ for (int i = 0; i < maxthreads; i++) rv = pthread_join(thd[i].thread, NULL); free(thd); + pthread_spin_destroy(&reserved_lock); return rv; }