lib/plist.c: share workload more efficient between threads

Before this patch xbps had a constant slice count for every thread.
This resulted in unbalanced thread usage if the workload was
homogenous over the slices.

This patch replaces the naive slice count approach by spinlock based
reservation.
This commit is contained in:
Enno Boland 2018-07-02 13:18:17 +02:00
parent 3f4d6a93be
commit bfd6b2e88e

View File

@ -38,7 +38,10 @@ struct thread_data {
xbps_dictionary_t dict; xbps_dictionary_t dict;
struct xbps_handle *xhp; struct xbps_handle *xhp;
unsigned int start; unsigned int start;
unsigned int end; unsigned int arraycount;
unsigned int *reserved;
pthread_spinlock_t *reserved_lock;
unsigned int slicecount;
int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *); int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *);
void *fn_arg; void *fn_arg;
}; };
@ -59,23 +62,33 @@ array_foreach_thread(void *arg)
const char *key; const char *key;
int rv; int rv;
bool loop_done = false; bool loop_done = false;
unsigned i = thd->start;
unsigned int end = i + thd->slicecount;
/* process pkgs from start until end */ while(i < thd->arraycount) {
for (unsigned int i = thd->start; i < thd->end; i++) { /* process pkgs from start until end */
obj = xbps_array_get(thd->array, i); for (; i < end && i < thd->arraycount; i++) {
if (xbps_object_type(thd->dict) == XBPS_TYPE_DICTIONARY) { obj = xbps_array_get(thd->array, i);
pkgd = xbps_dictionary_get_keysym(thd->dict, obj); if (xbps_object_type(thd->dict) == XBPS_TYPE_DICTIONARY) {
key = xbps_dictionary_keysym_cstring_nocopy(obj); pkgd = xbps_dictionary_get_keysym(thd->dict, obj);
/* ignore internal objs */ key = xbps_dictionary_keysym_cstring_nocopy(obj);
if (strncmp(key, "_XBPS_", 6) == 0) /* ignore internal objs */
continue; if (strncmp(key, "_XBPS_", 6) == 0)
} else { continue;
pkgd = obj; } else {
key = NULL; pkgd = obj;
key = NULL;
}
rv = (*thd->fn)(thd->xhp, pkgd, key, thd->fn_arg, &loop_done);
if (rv != 0 || loop_done)
return NULL;
} }
rv = (*thd->fn)(thd->xhp, pkgd, key, thd->fn_arg, &loop_done); /* Reserve more elements to compute */
if (rv != 0 || loop_done) pthread_spin_lock(thd->reserved_lock);
break; i = *thd->reserved;
end = i + thd->slicecount;
*thd->reserved = end;
pthread_spin_unlock(thd->reserved_lock);
} }
return NULL; return NULL;
} }
@ -88,8 +101,10 @@ xbps_array_foreach_cb_multi(struct xbps_handle *xhp,
void *arg) void *arg)
{ {
struct thread_data *thd; struct thread_data *thd;
unsigned int arraycount, slicecount, pkgcount; unsigned int arraycount, slicecount;
int rv = 0, maxthreads; int rv = 0, maxthreads;
unsigned int reserved = 0;
pthread_spinlock_t reserved_lock;
assert(fn != NULL); assert(fn != NULL);
@ -101,13 +116,25 @@ xbps_array_foreach_cb_multi(struct xbps_handle *xhp,
return 0; return 0;
maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN); maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN);
if (maxthreads <= 0 || maxthreads == 1) /* use single threaded routine */ if (maxthreads <= 1 || arraycount <= 1) /* use single threaded routine */
return xbps_array_foreach_cb(xhp, array, dict, fn, arg); return xbps_array_foreach_cb(xhp, array, dict, fn, arg);
if (pthread_spin_init(&reserved_lock, PTHREAD_PROCESS_PRIVATE) != 0)
return 0;
thd = calloc(maxthreads, sizeof(*thd)); thd = calloc(maxthreads, sizeof(*thd));
assert(thd); assert(thd);
slicecount = arraycount / maxthreads;
pkgcount = 0; // maxthread is boundchecked to be > 1
if((unsigned int)maxthreads >= arraycount) {
maxthreads = arraycount;
slicecount = 1;
} else {
slicecount = arraycount / maxthreads;
if (slicecount > 32) {
slicecount = 32;
}
}
for (int i = 0; i < maxthreads; i++) { for (int i = 0; i < maxthreads; i++) {
thd[i].array = array; thd[i].array = array;
@ -115,20 +142,21 @@ xbps_array_foreach_cb_multi(struct xbps_handle *xhp,
thd[i].xhp = xhp; thd[i].xhp = xhp;
thd[i].fn = fn; thd[i].fn = fn;
thd[i].fn_arg = arg; thd[i].fn_arg = arg;
thd[i].start = pkgcount; thd[i].start = i * slicecount;
if (i + 1 >= maxthreads) thd[i].reserved = &reserved;
thd[i].end = arraycount; thd[i].reserved_lock = &reserved_lock;
else thd[i].slicecount = slicecount;
thd[i].end = pkgcount + slicecount; thd[i].arraycount = arraycount;
pthread_create(&thd[i].thread, NULL, pthread_create(&thd[i].thread, NULL,
array_foreach_thread, &thd[i]); array_foreach_thread, &thd[i]);
pkgcount += slicecount;
} }
/* wait for all threads */ /* wait for all threads */
for (int i = 0; i < maxthreads; i++) for (int i = 0; i < maxthreads; i++)
rv = pthread_join(thd[i].thread, NULL); rv = pthread_join(thd[i].thread, NULL);
free(thd); free(thd);
pthread_spin_destroy(&reserved_lock);
return rv; return rv;
} }