Introduce xbps_{array,pkgdb}_foreach_cb_multi() and use it where appropiate.

In some tasks the single threaded implementation outperms the multithreaded
one. Use it where it really makes a difference. The _multi() routines do not
spawn any thread if _SC_NPROCESSORS_ONLN == 1.

Bump XBPS_API_VERSION.
This commit is contained in:
Juan RP 2013-09-17 16:30:13 +02:00
parent fce411bac6
commit 023484ca0b
9 changed files with 131 additions and 65 deletions

View File

@ -69,7 +69,7 @@ check_pkg_integrity_all(struct xbps_handle *xhp)
/* force an update to get total pkg count */ /* force an update to get total pkg count */
(void)xbps_pkgdb_update(xhp, false); (void)xbps_pkgdb_update(xhp, false);
rv = xbps_pkgdb_foreach_cb(xhp, pkgdb_cb, NULL); rv = xbps_pkgdb_foreach_cb_multi(xhp, pkgdb_cb, NULL);
if ((rv = xbps_pkgdb_update(xhp, true)) != 0) { if ((rv = xbps_pkgdb_update(xhp, true)) != 0) {
xbps_error_printf("failed to write pkgdb: %s\n", xbps_error_printf("failed to write pkgdb: %s\n",

View File

@ -215,7 +215,7 @@ find_longest_pkgver(struct xbps_handle *xhp, xbps_object_t o)
_find_longest_pkgver_cb, &ffl); _find_longest_pkgver_cb, &ffl);
xbps_object_release(array); xbps_object_release(array);
} else { } else {
(void)xbps_pkgdb_foreach_cb(xhp, (void)xbps_pkgdb_foreach_cb_multi(xhp,
_find_longest_pkgver_cb, &ffl); _find_longest_pkgver_cb, &ffl);
} }

View File

@ -31,13 +31,11 @@
#include <fnmatch.h> #include <fnmatch.h>
#include <dirent.h> #include <dirent.h>
#include <assert.h> #include <assert.h>
#include <pthread.h>
#include <xbps.h> #include <xbps.h>
#include "defs.h" #include "defs.h"
struct ffdata { struct ffdata {
pthread_mutex_t mtx;
int npatterns; int npatterns;
char **patterns; char **patterns;
const char *repouri; const char *repouri;
@ -102,8 +100,6 @@ ownedby_pkgdb_cb(struct xbps_handle *xhp,
xbps_dictionary_get_cstring_nocopy(obj, "pkgver", &pkgver); xbps_dictionary_get_cstring_nocopy(obj, "pkgver", &pkgver);
pthread_mutex_lock(&ffd->mtx);
pkgmetad = xbps_pkgdb_get_pkg_metadata(xhp, pkgver); pkgmetad = xbps_pkgdb_get_pkg_metadata(xhp, pkgver);
assert(pkgmetad); assert(pkgmetad);
@ -115,8 +111,6 @@ ownedby_pkgdb_cb(struct xbps_handle *xhp,
xbps_object_release(pkgmetad); xbps_object_release(pkgmetad);
xbps_object_release(files_keys); xbps_object_release(files_keys);
pthread_mutex_unlock(&ffd->mtx);
return 0; return 0;
} }
@ -128,7 +122,6 @@ ownedby(struct xbps_handle *xhp, int npatterns, char **patterns)
ffd.npatterns = npatterns; ffd.npatterns = npatterns;
ffd.patterns = patterns; ffd.patterns = patterns;
pthread_mutex_init(&ffd.mtx, NULL);
for (int i = 0; i < npatterns; i++) { for (int i = 0; i < npatterns; i++) {
rfile = realpath(patterns[i], NULL); rfile = realpath(patterns[i], NULL);
@ -189,7 +182,6 @@ repo_ownedby(struct xbps_handle *xhp, int npatterns, char **patterns)
char *rfile; char *rfile;
int rv; int rv;
pthread_mutex_init(&ffd.mtx, NULL);
ffd.npatterns = npatterns; ffd.npatterns = npatterns;
ffd.patterns = patterns; ffd.patterns = patterns;
@ -199,7 +191,6 @@ repo_ownedby(struct xbps_handle *xhp, int npatterns, char **patterns)
patterns[i] = rfile; patterns[i] = rfile;
} }
rv = xbps_rpool_foreach(xhp, repo_ownedby_cb, &ffd); rv = xbps_rpool_foreach(xhp, repo_ownedby_cb, &ffd);
pthread_mutex_destroy(&ffd.mtx);
return rv; return rv;
} }

View File

@ -40,7 +40,6 @@
#include <libgen.h> #include <libgen.h>
#include <fnmatch.h> #include <fnmatch.h>
#include <assert.h> #include <assert.h>
#include <pthread.h>
#include <xbps.h> #include <xbps.h>
#include "defs.h" #include "defs.h"
@ -49,7 +48,6 @@ struct search_data {
int npatterns; int npatterns;
char **patterns; char **patterns;
int maxcols; int maxcols;
pthread_mutex_t mtx;
xbps_array_t results; xbps_array_t results;
}; };
@ -119,10 +117,8 @@ search_array_cb(struct xbps_handle *xhp _unused,
if ((xbps_pkgpattern_match(pkgver, sd->patterns[x])) || if ((xbps_pkgpattern_match(pkgver, sd->patterns[x])) ||
(strcasestr(pkgver, sd->patterns[x])) || (strcasestr(pkgver, sd->patterns[x])) ||
(strcasestr(desc, sd->patterns[x])) || vpkgfound) { (strcasestr(desc, sd->patterns[x])) || vpkgfound) {
pthread_mutex_lock(&sd->mtx);
xbps_array_add_cstring_nocopy(sd->results, pkgver); xbps_array_add_cstring_nocopy(sd->results, pkgver);
xbps_array_add_cstring_nocopy(sd->results, desc); xbps_array_add_cstring_nocopy(sd->results, desc);
pthread_mutex_unlock(&sd->mtx);
} }
} }
return 0; return 0;
@ -135,9 +131,6 @@ search_pkgs_cb(struct xbps_repo *repo, void *arg, bool *done _unused)
struct search_data *sd = arg; struct search_data *sd = arg;
int rv; int rv;
if (repo->idx == NULL)
return 0;
allkeys = xbps_dictionary_all_keys(repo->idx); allkeys = xbps_dictionary_all_keys(repo->idx);
rv = xbps_array_foreach_cb(repo->xhp, allkeys, repo->idx, search_array_cb, sd); rv = xbps_array_foreach_cb(repo->xhp, allkeys, repo->idx, search_array_cb, sd);
xbps_object_release(allkeys); xbps_object_release(allkeys);
@ -151,7 +144,6 @@ repo_search(struct xbps_handle *xhp, int npatterns, char **patterns)
struct search_data sd; struct search_data sd;
int rv; int rv;
pthread_mutex_init(&sd.mtx, NULL);
sd.npatterns = npatterns; sd.npatterns = npatterns;
sd.patterns = patterns; sd.patterns = patterns;
sd.maxcols = get_maxcols(); sd.maxcols = get_maxcols();
@ -166,7 +158,6 @@ repo_search(struct xbps_handle *xhp, int npatterns, char **patterns)
print_results(xhp, &sd); print_results(xhp, &sd);
xbps_object_release(sd.results); xbps_object_release(sd.results);
} }
pthread_mutex_destroy(&sd.mtx);
return rv; return rv;
} }

View File

@ -119,7 +119,7 @@ index_clean(struct xbps_handle *xhp, const char *repodir)
cbd.array = xbps_array_create(); cbd.array = xbps_array_create();
allkeys = xbps_dictionary_all_keys(idx); allkeys = xbps_dictionary_all_keys(idx);
rv = xbps_array_foreach_cb(xhp, allkeys, idx, idx_cleaner_cb, &cbd); rv = xbps_array_foreach_cb_multi(xhp, allkeys, idx, idx_cleaner_cb, &cbd);
xbps_object_release(allkeys); xbps_object_release(allkeys);
for (unsigned int x = 0; x < xbps_array_count(cbd.array); x++) { for (unsigned int x = 0; x < xbps_array_count(cbd.array); x++) {

View File

@ -147,7 +147,7 @@ remove_obsoletes(struct xbps_handle *xhp, const char *repodir)
} }
(void)closedir(dirp); (void)closedir(dirp);
rv = xbps_array_foreach_cb(xhp, array, NULL, cleaner_cb, repo); rv = xbps_array_foreach_cb_multi(xhp, array, NULL, cleaner_cb, repo);
xbps_repo_close(repo); xbps_repo_close(repo);
xbps_object_release(array); xbps_object_release(array);

View File

@ -46,7 +46,7 @@
* *
* This header documents the full API for the XBPS Library. * This header documents the full API for the XBPS Library.
*/ */
#define XBPS_API_VERSION "20130727-1" #define XBPS_API_VERSION "20130917"
#ifndef XBPS_VERSION #ifndef XBPS_VERSION
#define XBPS_VERSION "UNSET" #define XBPS_VERSION "UNSET"
@ -699,7 +699,7 @@ xbps_array_t xbps_find_pkg_obsoletes(struct xbps_handle *xhp,
/** /**
* Executes a function callback per a package dictionary registered * Executes a function callback per a package dictionary registered
* in master package database (pkgdb) plist (downwards). * in master package database (pkgdb) plist.
* *
* @param[in] xhp The pointer to the xbps_handle struct. * @param[in] xhp The pointer to the xbps_handle struct.
* @param[in] fn Function callback to run for any pkg dictionary. * @param[in] fn Function callback to run for any pkg dictionary.
@ -712,6 +712,24 @@ int xbps_pkgdb_foreach_cb(struct xbps_handle *xhp,
int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *),
void *arg); void *arg);
/**
* Executes a function callback per a package dictionary registered
* in master package database (pkgdb) plist.
*
* This is a multithreaded implementation spawning a thread per core. Each
* thread processes a fraction of total objects in the pkgdb dictionary.
*
* @param[in] xhp The pointer to the xbps_handle struct.
* @param[in] fn Function callback to run for any pkg dictionary.
* @param[in] arg Argument to be passed to the function callback.
*
* @return 0 on success (all objects were processed), otherwise
* the value returned by the function callback.
*/
int xbps_pkgdb_foreach_cb_multi(struct xbps_handle *xhp,
int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *),
void *arg);
/** /**
* Returns a package dictionary from master package database (pkgdb) plist, * Returns a package dictionary from master package database (pkgdb) plist,
* matching pkgname or pkgver object in \a pkg. * matching pkgname or pkgver object in \a pkg.
@ -813,12 +831,43 @@ int xbps_pkg_exec_script(struct xbps_handle *xhp,
/** @addtogroup plist */ /** @addtogroup plist */
/*@{*/ /*@{*/
/**
* Executes a function callback (\a fn) per object in the proplib array \a array.
*
* @param[in] xhp The pointer to the xbps_handle struct.
* @param[in] array The proplib array to traverse.
* @param[in] dict The dictionary associated with the array.
* @param[in] fn Function callback to run for any pkg dictionary.
* @param[in] arg Argument to be passed to the function callback.
*
* @return 0 on success (all objects were processed), otherwise
* the value returned by the function callback.
*/
int xbps_array_foreach_cb(struct xbps_handle *xhp, int xbps_array_foreach_cb(struct xbps_handle *xhp,
xbps_array_t array, xbps_array_t array,
xbps_dictionary_t dict, xbps_dictionary_t dict,
int (*fn)(struct xbps_handle *, xbps_object_t obj, const char *, void *arg, bool *done), int (*fn)(struct xbps_handle *, xbps_object_t obj, const char *, void *arg, bool *done),
void *arg); void *arg);
/**
* Executes a function callback (\a fn) per object in the proplib array \a array.
* This is a multithreaded implementation spawning a thread per core. Each
* thread processes a fraction of total objects in the array.
*
* @param[in] xhp The pointer to the xbps_handle struct.
* @param[in] array The proplib array to traverse.
* @param[in] dict The dictionary associated with the array.
* @param[in] fn Function callback to run for any pkg dictionary.
* @param[in] arg Argument to be passed to the function callback.
*
* @return 0 on success (all objects were processed), otherwise
* the value returned by the function callback.
*/
int xbps_array_foreach_cb_multi(struct xbps_handle *xhp,
xbps_array_t array,
xbps_dictionary_t dict,
int (*fn)(struct xbps_handle *, xbps_object_t obj, const char *, void *arg, bool *done),
void *arg);
/** /**
* Match a virtual package name or pattern by looking at package's * Match a virtual package name or pattern by looking at package's
* dictionary "provides" array object. * dictionary "provides" array object.

View File

@ -153,6 +153,24 @@ xbps_pkgdb_foreach_cb(struct xbps_handle *xhp,
return rv; return rv;
} }
int
xbps_pkgdb_foreach_cb_multi(struct xbps_handle *xhp,
int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *),
void *arg)
{
xbps_array_t allkeys;
int rv;
if ((rv = xbps_pkgdb_init(xhp)) != 0)
return rv;
allkeys = xbps_dictionary_all_keys(xhp->pkgdb);
assert(allkeys);
rv = xbps_array_foreach_cb_multi(xhp, allkeys, xhp->pkgdb, fn, arg);
xbps_object_release(allkeys);
return rv;
}
xbps_dictionary_t xbps_dictionary_t
xbps_pkgdb_get_pkg(struct xbps_handle *xhp, const char *pkg) xbps_pkgdb_get_pkg(struct xbps_handle *xhp, const char *pkg)
{ {

View File

@ -85,7 +85,7 @@ array_foreach_thread(void *arg)
} }
int int
xbps_array_foreach_cb(struct xbps_handle *xhp, xbps_array_foreach_cb_multi(struct xbps_handle *xhp,
xbps_array_t array, xbps_array_t array,
xbps_dictionary_t dict, xbps_dictionary_t dict,
int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *), int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *),
@ -106,50 +106,67 @@ xbps_array_foreach_cb(struct xbps_handle *xhp,
return 0; return 0;
maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN); maxthreads = (int)sysconf(_SC_NPROCESSORS_ONLN);
if (maxthreads > 1) { if (maxthreads == 1) /* use single threaded routine */
thd = calloc(maxthreads, sizeof(*thd)); return xbps_array_foreach_cb(xhp, array, dict, fn, arg);
assert(thd);
slicecount = arraycount / maxthreads;
pkgcount = 0;
pthread_mutex_init(&mtx, NULL);
for (int i = 0; i < maxthreads; i++) { thd = calloc(maxthreads, sizeof(*thd));
thd[i].mtx = &mtx; assert(thd);
thd[i].array = array; slicecount = arraycount / maxthreads;
thd[i].dict = dict; pkgcount = 0;
thd[i].xhp = xhp; pthread_mutex_init(&mtx, NULL);
thd[i].fn = fn;
thd[i].fn_arg = arg;
thd[i].start = pkgcount;
if (i + 1 >= maxthreads)
thd[i].end = arraycount;
else
thd[i].end = pkgcount + slicecount;
pthread_create(&thd[i].thread, NULL,
array_foreach_thread, &thd[i]);
pkgcount += slicecount;
}
/* wait for all threads */
for (int i = 0; i < maxthreads; i++)
pthread_join(thd[i].thread, NULL);
pthread_mutex_destroy(&mtx); for (int i = 0; i < maxthreads; i++) {
free(thd); thd[i].mtx = &mtx;
} else { thd[i].array = array;
/* single threaded */ thd[i].dict = dict;
struct thread_data mythd; thd[i].xhp = xhp;
thd[i].fn = fn;
mythd.mtx = NULL; thd[i].fn_arg = arg;
mythd.array = array; thd[i].start = pkgcount;
mythd.dict = dict; if (i + 1 >= maxthreads)
mythd.xhp = xhp; thd[i].end = arraycount;
mythd.start = 0; else
mythd.end = arraycount; thd[i].end = pkgcount + slicecount;
mythd.fn = fn; pthread_create(&thd[i].thread, NULL,
mythd.fn_arg = arg; array_foreach_thread, &thd[i]);
array_foreach_thread(&mythd); pkgcount += slicecount;
} }
/* wait for all threads */
for (int i = 0; i < maxthreads; i++)
rv = pthread_join(thd[i].thread, NULL);
pthread_mutex_destroy(&mtx);
free(thd);
return rv;
}
int
xbps_array_foreach_cb(struct xbps_handle *xhp,
xbps_array_t array,
xbps_dictionary_t dict,
int (*fn)(struct xbps_handle *, xbps_object_t, const char *, void *, bool *),
void *arg)
{
xbps_dictionary_t pkgd;
xbps_object_t obj;
const char *key;
int rv = 0;
bool loop_done = false;
for (unsigned int i = 0; i < xbps_array_count(array); i++) {
obj = xbps_array_get(array, i);
if (xbps_object_type(dict) == XBPS_TYPE_DICTIONARY) {
pkgd = xbps_dictionary_get_keysym(dict, obj);
key = xbps_dictionary_keysym_cstring_nocopy(obj);
} else {
pkgd = obj;
key = NULL;
}
rv = (*fn)(xhp, pkgd, key, arg, &loop_done);
if (rv != 0 || loop_done)
break;
}
return rv; return rv;
} }