[Sugar-devel] [PATCH] RFC: Optimizer: simplify, avoid hashing files, remove 'checksums' dir

Martin Langhoff martin at laptop.org
Wed Sep 26 15:23:56 EDT 2012


This patch changes the strategy used by optimize.
Instead of maintaining a 'checksum' field for every
file, and maintaining metadata about whether they are
linked in the 'checksums' dir, it will only ever hash
files that match size _exactly_ with a new file.

A DS that does not see 2 files of identical size will
never hash any file.

 - Allows the datastore to handle large files (downloads,
   video captures) without paying a high 'hashing' price.

 - Removes the 'checksums' dirtree; which was redundant:
   we can evaluate whether two files are hardlinked comparing
   their inode.

 - There are no users of 'checksum' metadata field out of
   DS, so we remove it completely.

 - Removal of checksum metadata and the 'checksums' dirtree
   means less risk of out-of-date metadata.

Caveats:

 - Assumes no program is producing large files of identical size.
   On the XO, this is true. Some file formats with no compression
   could trigger unwelcome hashing (ie: an activity that saves TIFF
   or BMP files).

 - The optimizer should run in a separate process, to avoid making the DS
   unresponsive. Was written to avoid races, but would need to flock()
   the file between hashing it and hardlinking it.
   I need to read up on forking a long-running process from a dbus service.
---
Background: reviewing DS code and datastuctures, the checksums dirtree
and strategy in optimize.py looked problematic to me. If I am saving a large
video, I don't want it hashed (expensive op) if I can (cheaply) check that
no other file is the same size.

I could retain the 'checksum' metadata field -- and add some assurances
that it does not get stale. It would help with large and identical-sized
files, such as screenshots, which may otherwise end up hashed more than
once, perhaps many times.
---
 src/carquinyol/datastore.py     |    4 -
 src/carquinyol/filestore.py     |   12 ---
 src/carquinyol/layoutmanager.py |    9 --
 src/carquinyol/metadatastore.py |    4 +-
 src/carquinyol/optimizer.py     |  225 ++++++++++++++++++---------------------
 5 files changed, 106 insertions(+), 148 deletions(-)

diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py
index a859dfe..0ee70d2 100644
--- a/src/carquinyol/datastore.py
+++ b/src/carquinyol/datastore.py
@@ -362,9 +362,6 @@ class DataStore(dbus.service.Object):
         self._metadata_store.store(uid, props)
         self._index_store.store(uid, props)
 
-        if os.path.exists(self._file_store.get_file_path(uid)) and \
-                (not file_path or os.path.exists(file_path)):
-            self._optimizer.remove(uid)
         self._file_store.store(uid, file_path, transfer_ownership,
                 lambda * args: self._update_completion_cb(async_cb,
                                                          async_err_cb,
@@ -489,7 +486,6 @@ class DataStore(dbus.service.Object):
         self._mark_dirty()
         try:
             entry_path = layoutmanager.get_instance().get_entry_path(uid)
-            self._optimizer.remove(uid)
             self._index_store.delete(uid)
             self._file_store.delete(uid)
             self._metadata_store.delete(uid)
diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py
index 0b34b69..38a4018 100644
--- a/src/carquinyol/filestore.py
+++ b/src/carquinyol/filestore.py
@@ -146,18 +146,6 @@ class FileStore(object):
         if os.path.exists(file_path):
             os.remove(file_path)
 
-    def hard_link_entry(self, new_uid, existing_uid):
-        existing_file = layoutmanager.get_instance().get_data_path(
-            existing_uid)
-        new_file = layoutmanager.get_instance().get_data_path(new_uid)
-
-        logging.debug('removing %r', new_file)
-        os.remove(new_file)
-
-        logging.debug('hard linking %r -> %r', new_file, existing_file)
-        os.link(existing_file, new_file)
-
-
 class AsyncCopy(object):
     """Copy a file in chunks in the idle loop.
 
diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py
index 3179a98..ee8270c 100644
--- a/src/carquinyol/layoutmanager.py
+++ b/src/carquinyol/layoutmanager.py
@@ -35,9 +35,6 @@ class LayoutManager(object):
         if not os.path.exists(self._root_path):
             os.makedirs(self._root_path)
 
-        self._create_if_needed(self.get_checksums_dir())
-        self._create_if_needed(self.get_queue_path())
-
     def _create_if_needed(self, path):
         if not os.path.exists(path):
             os.makedirs(path)
@@ -74,12 +71,6 @@ class LayoutManager(object):
     def get_index_path(self):
         return os.path.join(self._root_path, 'index')
 
-    def get_checksums_dir(self):
-        return os.path.join(self._root_path, 'checksums')
-
-    def get_queue_path(self):
-        return os.path.join(self.get_checksums_dir(), 'queue')
-
     def find_all(self):
         uids = []
         for f in os.listdir(self._root_path):
diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py
index 52cc10f..b6c2497 100644
--- a/src/carquinyol/metadatastore.py
+++ b/src/carquinyol/metadatastore.py
@@ -4,8 +4,6 @@ from carquinyol import layoutmanager
 from carquinyol import metadatareader
 
 MAX_SIZE = 256
-_INTERNAL_KEYS = ['checksum']
-
 
 class MetadataStore(object):
 
@@ -16,7 +14,7 @@ class MetadataStore(object):
         else:
             received_keys = metadata.keys()
             for key in os.listdir(metadata_path):
-                if key not in _INTERNAL_KEYS and key not in received_keys:
+                if key not in received_keys:
                     os.remove(os.path.join(metadata_path, key))
 
         metadata['uid'] = uid
diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py
index c038c2b..2129937 100644
--- a/src/carquinyol/optimizer.py
+++ b/src/carquinyol/optimizer.py
@@ -30,133 +30,118 @@ class Optimizer(object):
 
     def __init__(self, file_store, metadata_store):
         self._file_store = file_store
-        self._metadata_store = metadata_store
-        self._enqueue_checksum_id = None
+        self._root_path = layoutmanager.get_instance().get_root_path()
+        self._optimized_flag_path = os.path.join(self._root_path, 'optimized')
 
     def optimize(self, uid):
-        """Add an entry to a queue of entries to be checked for duplicates.
+        """Schedule an optimization.
 
         """
         if not os.path.exists(self._file_store.get_file_path(uid)):
             return
-
-        queue_path = layoutmanager.get_instance().get_queue_path()
-        open(os.path.join(queue_path, uid), 'w').close()
-        logging.debug('optimize %r', os.path.join(queue_path, uid))
-
-        if self._enqueue_checksum_id is None:
-            self._enqueue_checksum_id = \
-                    gobject.idle_add(self._process_entry_cb,
-                                     priority=gobject.PRIORITY_LOW)
-
-    def remove(self, uid):
-        """Remove any structures left from space optimization
-
-        """
-        checksum = self._metadata_store.get_property(uid, 'checksum')
-        if checksum is None:
-            return
-
-        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
-        checksum_path = os.path.join(checksums_dir, checksum)
-        checksum_entry_path = os.path.join(checksum_path, uid)
-
-        if os.path.exists(checksum_entry_path):
-            logging.debug('remove %r', checksum_entry_path)
-            os.remove(checksum_entry_path)
-
-        if os.path.exists(checksum_path):
-            try:
-                os.rmdir(checksum_path)
-                logging.debug('removed %r', checksum_path)
-            except OSError, e:
-                if e.errno != errno.ENOTEMPTY:
-                    raise
-
-    def _identical_file_already_exists(self, checksum):
-        """Check if we already have files with this checksum.
-
-        """
-        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
-        checksum_path = os.path.join(checksums_dir, checksum)
-        return os.path.exists(checksum_path)
-
-    def _get_uid_from_checksum(self, checksum):
-        """Get an existing entry which file matches checksum.
-
-        """
-        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
-        checksum_path = os.path.join(checksums_dir, checksum)
-        first_uid = os.listdir(checksum_path)[0]
-        return first_uid
-
-    def _create_checksum_dir(self, checksum):
-        """Create directory that tracks files with this same checksum.
-
-        """
-        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
-        checksum_path = os.path.join(checksums_dir, checksum)
-        logging.debug('create dir %r', checksum_path)
-        os.mkdir(checksum_path)
-
-    def _add_checksum_entry(self, uid, checksum):
-        """Create a file in the checksum dir with the uid of the entry
-
-        """
-        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
-        checksum_path = os.path.join(checksums_dir, checksum)
-
-        logging.debug('touch %r', os.path.join(checksum_path, uid))
-        open(os.path.join(checksum_path, uid), 'w').close()
-
-    def _already_linked(self, uid, checksum):
-        """Check if this entry's file is already a hard link to the checksums
-           dir.
-
-        """
-        checksums_dir = layoutmanager.get_instance().get_checksums_dir()
-        checksum_path = os.path.join(checksums_dir, checksum)
-        return os.path.exists(os.path.join(checksum_path, uid))
-
-    def _process_entry_cb(self):
-        """Process one item in the checksums queue by calculating its checksum,
-           checking if there exist already an identical file, and in that case
-           substituting its file with a hard link to that pre-existing file.
-
+        logging.debug('schedule optimize')
+        gobject.idle_add(self._deferred_optimize_cb,
+                         priority=gobject.PRIORITY_LOW)
+
+    def _deferred_optimize_cb(self):
+        """Optimize trying to avoid running costly md5sum.
+           We query the stat of data files to find
+           file pairs of the same size, and we check they
+           are not already linked reading the inode.
+           We keep a flag indicating last run, we look for
+           files newer than the flag, to avoid re-hashing
+           old file-pairs that don't match.
+           This seems like a lot of work, but it's done
+           on dirents which are in the kernel cache, so
+           it is virtually free.
+           The big win comes from never hashing large files.
+           In the rare case where two large files are exactly
+           the same size, there's good cause to hash them.
+           FIXME: fork to a bg process.
         """
-        queue_path = layoutmanager.get_instance().get_queue_path()
-        queue = os.listdir(queue_path)
-        if queue:
-            uid = queue[0]
-            logging.debug('_process_entry_cb processing %r', uid)
-
-            file_in_entry_path = self._file_store.get_file_path(uid)
-            if not os.path.exists(file_in_entry_path):
-                logging.info('non-existent entry in queue: %r', uid)
-            else:
-                checksum = self._calculate_md5sum(file_in_entry_path)
-                self._metadata_store.set_property(uid, 'checksum', checksum)
-
-                if self._identical_file_already_exists(checksum):
-                    if not self._already_linked(uid, checksum):
-                        existing_entry_uid = \
-                                self._get_uid_from_checksum(checksum)
-
-                        self._file_store.hard_link_entry(uid,
-                                                         existing_entry_uid)
-
-                        self._add_checksum_entry(uid, checksum)
-                else:
-                    self._create_checksum_dir(checksum)
-                    self._add_checksum_entry(uid, checksum)
-
-            os.remove(os.path.join(queue_path, uid))
-
-        if len(queue) <= 1:
-            self._enqueue_checksum_id = None
-            return False
-        else:
-            return True
+        logging.debug('_deferred_optimize_cb')
+        last_optimized = 0
+        if os.path.exists(self._optimized_flag_path):
+            last_optimized = os.stat(self._optimized_flag_path).st_mtime
+        # touch it now, to win races
+        f = open(self._optimized_flag_path, 'w')
+        f.write('.')
+        f.close()
+
+        datafiles = subprocess.check_output(['find', self._root_path,
+                                             '-type' , 'f',
+                                             '-name', 'data',
+                                             '-mindepth', '3',
+                                             '-maxdepth', '3',
+                                             '-print0'])
+        by_size = {}
+        recent = []
+        for fpath in datafiles.split('\0'):
+            if not fpath:
+                continue
+            s = os.stat(fpath)
+            dfile = [fpath, s]
+            if s.st_mtime > last_optimized:
+                recent.append(dfile)
+            if not by_size.has_key(s.st_size):
+                by_size[s.st_size] = []
+            by_size[s.st_size].append(dfile)
+
+        sz_seen = []
+        for dfile in recent:
+            # only worth md5summing files same size
+            # only evaluate each given size once
+            if dfile[1].st_size in sz_seen:
+                continue
+            sz_seen.append(dfile[1].st_size)
+
+            candidates = by_size[dfile[1].st_size]
+            if len(candidates) < 2:
+                # only file this size
+                continue
+
+            # avoid csum of already hardlinked files
+            by_inode = {}
+            for cdate in candidates:
+                inode = cdate[1].st_ino
+                if not by_inode.has_key(inode):
+                    by_inode[inode] = []
+                by_inode[inode].append(cdate)
+            if len(by_inode.keys()) == 1:
+                # hardlinked already
+                continue
+
+            by_csum = {}
+            # cluster by csum.
+            # only csum once per inode
+            for inode in by_inode.keys():
+                # use fpath from the first entry
+                fpath = by_inode[inode][0][0]
+                logging.debug('optimize: md5sum(%s)' % fpath)
+                csum = self._calculate_md5sum(fpath)
+                logging.debug('optimize: md5sum(%s) done' % fpath)
+                if not by_csum.has_key(csum):
+                    by_csum[csum] = []
+                by_csum[csum].append(inode)
+
+            for csum in by_csum.keys():
+                if len(by_csum[csum]) == 1:
+                    continue
+                fpath_grp = []
+                for inode in by_csum[csum]:
+                    for cdate in by_inode[inode]:
+                        fpath_grp.append(cdate[0])
+                target_path=fpath_grp.pop()
+                for fpath in fpath_grp:
+                    logging.debug('hardlinking target %s from %s' % (target_path, fpath))
+                    temp_path = fpath+'.tmp'
+                    if os.path.exists(temp_path):
+                        os.remove(temp_path)
+                    os.link(target_path, temp_path)
+                    os.rename(temp_path, fpath)
+
+        # indicates to the idle loop caller we're done
+        return False
 
     def _calculate_md5sum(self, path):
         """Calculate the md5 checksum of a given file.
-- 
1.7.10.4



More information about the Sugar-devel mailing list