import unittest import logging import os import tempfile import shutil import time import traceback import sys sys.path.append('../src') try: set() except NameError: from sets import Set as set import GrampsBSDDB import RelLib logger = logging.getLogger('Gramps.GrampsDbBase_Test') class ReferenceMapTest (unittest.TestCase): def setUp(self): self._tmpdir = tempfile.mkdtemp() self._filename = os.path.join(self._tmpdir,'test.grdb') self._db = GrampsBSDDB.GrampsBSDDB() self._db.load(self._filename, None, # callback "w") def tearDown(self): shutil.rmtree(self._tmpdir) def _populate_database(self, num_sources = 1, num_persons = 0, num_families = 0, num_events = 0, num_places = 0, num_media_objects = 0, num_links = 1): # start with sources sources = [] for i in xrange(0,num_sources): sources.append(self._add_source()) # now for each of the other tables. Give each entry a link # to num_link sources, sources are chosen on a round robin # basis for num, add_func in ((num_persons, self._add_person_with_sources), (num_families, self._add_family_with_sources), (num_events, self._add_event_with_sources), (num_places, self._add_place_with_sources), (num_media_objects, self._add_media_object_with_sources)): source_idx = 1 for person_idx in xrange(0,num): # Get the list of sources to link lnk_sources = set() for i in xrange(0,num_links): lnk_sources.add(sources[source_idx-1]) source_idx = (source_idx+1) % len(sources) try: add_func(lnk_sources) except: print "person_idx = ", person_idx print "lnk_sources = ", repr(lnk_sources) raise return def _add_source(self): # Add a Source tran = self._db.transaction_begin() source = RelLib.Source() self._db.add_source(source,tran) self._db.commit_source(source,tran) self._db.transaction_commit(tran, "Add Source") return source def _add_object_with_source(self,sources,object_class,add_method,commit_method): object = object_class() for source in sources: src_ref = RelLib.SourceRef() src_ref.set_base_handle(source.get_handle()) object.add_source_reference(src_ref) tran = self._db.transaction_begin() add_method(object,tran) commit_method(object,tran) self._db.transaction_commit(tran, "Add Object") return object def _add_person_with_sources(self,sources): return self._add_object_with_source(sources, RelLib.Person, self._db.add_person, self._db.commit_person) def _add_family_with_sources(self,sources): return self._add_object_with_source(sources, RelLib.Family, self._db.add_family, self._db.commit_family) def _add_event_with_sources(self,sources): return self._add_object_with_source(sources, RelLib.Event, self._db.add_event, self._db.commit_event) def _add_place_with_sources(self,sources): return self._add_object_with_source(sources, RelLib.Place, self._db.add_place, self._db.commit_place) def _add_media_object_with_sources(self,sources): return self._add_object_with_source(sources, RelLib.MediaObject, self._db.add_object, self._db.commit_media_object) def test_simple_lookup(self): """insert a record and a reference and check that a lookup for the reference returns the original record.""" source = self._add_source() person = self._add_person_with_sources([source]) references = [ ref for ref in self._db.find_backlink_handles(source.get_handle()) ] assert len(references) == 1 assert references[0] == ('Person',person.get_handle()) def test_delete_primary(self): """check that deleting a primary will remove the backreferences from the reference_map""" source = self._add_source() person = self._add_person_with_sources([source]) assert self._db.get_person_from_handle(person.get_handle()) is not None tran = self._db.transaction_begin() self._db.remove_person(person.get_handle(),tran) self._db.transaction_commit(tran, "Del Person") assert self._db.get_person_from_handle(person.get_handle()) == None references = [ ref for ref in self._db.find_backlink_handles(source.get_handle()) ] assert len(references) == 0, "len(references) == %s " % str(len(references)) def test_reindex_reference_map(self): """Test that the reindex function works.""" # unhook the reference_map update function so that we # can insert some records without the reference_map being updated. update_method = self._db._update_reference_map self._db._update_reference_map = lambda x,y: 1 # Insert a person/source pair. source = self._add_source() person = self._add_person_with_sources([source]) # Check that the reference map does not contain the reference. references = [ ref for ref in self._db.find_backlink_handles(source.get_handle()) ] assert len(references) == 0, "len(references) == %s " % str(len(references)) # Reinstate the reference_map method and reindex the database self._db._update_reference_map = update_method self._db.reindex_reference_map() # Check that the reference now appears in the reference_map references = [ ref for ref in self._db.find_backlink_handles(source.get_handle()) ] assert len(references) == 1, "len(references) == %s " % str(len(references)) def _timeit(func,*args,**kwargs): start = time.time() func(*args,**kwargs) end = time.time() return end - start def perf_simple_search_speed(self): num_sources = 100 num_persons = 1000 num_families = 10 num_events = 10 num_places = 10 num_media_objects = 10 num_links = 10 self._populate_database(num_sources, num_persons, num_families, num_events, num_places, num_media_objects, num_links) # time searching for source backrefs with and without reference_map cur = self._db.get_source_cursor() handle,data = cur.first() cur.close() start = time.time() references = [ ref for ref in self._db.find_backlink_handles(handle) ] end = time.time() with_reference_map = end - start remember = self._db.__class__.find_backlink_handles self._db.__class__.find_backlink_handles = self._db.__class__.__base__.find_backlink_handles start = time.time() references = [ ref for ref in self._db.find_backlink_handles(handle) ] end = time.time() without_reference_map = end - start self._db.__class__.find_backlink_handles = remember logger.info("search test with following data: \n" "num_sources = %d \n" "num_persons = %d \n" "num_families = %d \n" "num_events = %d \n" "num_places = %d \n" "num_media_objects = %d \n" "num_links = %d" % (num_sources, num_persons, num_families, num_events, num_places, num_media_objects, num_links)) logger.info("with refs %s\n", str(with_reference_map)) logger.info("without refs %s\n", str(without_reference_map)) assert with_reference_map < (without_reference_map / 10), "Reference_map should an order of magnitude faster." def testSuite(): return unittest.makeSuite(ReferenceMapTest,'test') def perfSuite(): return unittest.makeSuite(ReferenceMapTest,'perf') if __name__ == '__main__': unittest.TextTestRunner().run(testSuite())