123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
- # Copyright 2012 Google, Inc.
- # Copyright 2014 Thomas Amland <thomas.amland@gmail.com>
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- :module: watchdog.utils.dirsnapshot
- :synopsis: Directory snapshots and comparison.
- :author: yesudeep@google.com (Yesudeep Mangalapilly)
- .. ADMONITION:: Where are the moved events? They "disappeared"
- This implementation does not take partition boundaries
- into consideration. It will only work when the directory
- tree is entirely on the same file system. More specifically,
- any part of the code that depends on inode numbers can
- break if partition boundaries are crossed. In these cases,
- the snapshot diff will represent file/directory movement as
- created and deleted events.
- Classes
- -------
- .. autoclass:: DirectorySnapshot
- :members:
- :show-inheritance:
- .. autoclass:: DirectorySnapshotDiff
- :members:
- :show-inheritance:
- """
- import errno
- import os
- from stat import S_ISDIR
- from watchdog.utils import stat as default_stat
- try:
- from os import scandir
- except ImportError:
- from os import listdir as scandir
- class DirectorySnapshotDiff(object):
- """
- Compares two directory snapshots and creates an object that represents
- the difference between the two snapshots.
- :param ref:
- The reference directory snapshot.
- :type ref:
- :class:`DirectorySnapshot`
- :param snapshot:
- The directory snapshot which will be compared
- with the reference snapshot.
- :type snapshot:
- :class:`DirectorySnapshot`
- """
- def __init__(self, ref, snapshot):
- created = snapshot.paths - ref.paths
- deleted = ref.paths - snapshot.paths
- # check that all unchanged paths have the same inode
- for path in ref.paths & snapshot.paths:
- if ref.inode(path) != snapshot.inode(path):
- created.add(path)
- deleted.add(path)
- # find moved paths
- moved = set()
- for path in set(deleted):
- inode = ref.inode(path)
- new_path = snapshot.path(inode)
- if new_path:
- # file is not deleted but moved
- deleted.remove(path)
- moved.add((path, new_path))
- for path in set(created):
- inode = snapshot.inode(path)
- old_path = ref.path(inode)
- if old_path:
- created.remove(path)
- moved.add((old_path, path))
- # find modified paths
- # first check paths that have not moved
- modified = set()
- for path in ref.paths & snapshot.paths:
- if ref.inode(path) == snapshot.inode(path):
- if ref.mtime(path) != snapshot.mtime(path) or ref.size(path) != snapshot.size(path):
- modified.add(path)
- for (old_path, new_path) in moved:
- if ref.mtime(old_path) != snapshot.mtime(new_path) or ref.size(old_path) != snapshot.size(new_path):
- modified.add(old_path)
- self._dirs_created = [path for path in created if snapshot.isdir(path)]
- self._dirs_deleted = [path for path in deleted if ref.isdir(path)]
- self._dirs_modified = [path for path in modified if ref.isdir(path)]
- self._dirs_moved = [(frm, to) for (frm, to) in moved if ref.isdir(frm)]
- self._files_created = list(created - set(self._dirs_created))
- self._files_deleted = list(deleted - set(self._dirs_deleted))
- self._files_modified = list(modified - set(self._dirs_modified))
- self._files_moved = list(moved - set(self._dirs_moved))
- def __str__(self):
- return self.__repr__()
- def __repr__(self):
- fmt = (
- '<{0} files(created={1}, deleted={2}, modified={3}, moved={4}),'
- ' folders(created={5}, deleted={6}, modified={7}, moved={8})>'
- )
- return fmt.format(
- type(self).__name__,
- len(self._files_created),
- len(self._files_deleted),
- len(self._files_modified),
- len(self._files_moved),
- len(self._dirs_created),
- len(self._dirs_deleted),
- len(self._dirs_modified),
- len(self._dirs_moved)
- )
- @property
- def files_created(self):
- """List of files that were created."""
- return self._files_created
- @property
- def files_deleted(self):
- """List of files that were deleted."""
- return self._files_deleted
- @property
- def files_modified(self):
- """List of files that were modified."""
- return self._files_modified
- @property
- def files_moved(self):
- """
- List of files that were moved.
- Each event is a two-tuple the first item of which is the path
- that has been renamed to the second item in the tuple.
- """
- return self._files_moved
- @property
- def dirs_modified(self):
- """
- List of directories that were modified.
- """
- return self._dirs_modified
- @property
- def dirs_moved(self):
- """
- List of directories that were moved.
- Each event is a two-tuple the first item of which is the path
- that has been renamed to the second item in the tuple.
- """
- return self._dirs_moved
- @property
- def dirs_deleted(self):
- """
- List of directories that were deleted.
- """
- return self._dirs_deleted
- @property
- def dirs_created(self):
- """
- List of directories that were created.
- """
- return self._dirs_created
- class DirectorySnapshot(object):
- """
- A snapshot of stat information of files in a directory.
- :param path:
- The directory path for which a snapshot should be taken.
- :type path:
- ``str``
- :param recursive:
- ``True`` if the entire directory tree should be included in the
- snapshot; ``False`` otherwise.
- :type recursive:
- ``bool``
- :param walker_callback:
- .. deprecated:: 0.7.2
- :param stat:
- Use custom stat function that returns a stat structure for path.
- Currently only st_dev, st_ino, st_mode and st_mtime are needed.
- A function with the signature ``walker_callback(path, stat_info)``
- which will be called for every entry in the directory tree.
- :param listdir:
- Use custom listdir function. For details see ``os.scandir`` if available, else ``os.listdir``.
- """
- def __init__(self, path, recursive=True,
- walker_callback=(lambda p, s: None),
- stat=default_stat,
- listdir=scandir):
- self.recursive = recursive
- self.walker_callback = walker_callback
- self.stat = stat
- self.listdir = listdir
- self._stat_info = {}
- self._inode_to_path = {}
- st = stat(path)
- self._stat_info[path] = st
- self._inode_to_path[(st.st_ino, st.st_dev)] = path
- for p, st in self.walk(path):
- i = (st.st_ino, st.st_dev)
- self._inode_to_path[i] = p
- self._stat_info[p] = st
- walker_callback(p, st)
- def walk(self, root):
- try:
- paths = [os.path.join(root, entry if isinstance(entry, str) else entry.name)
- for entry in self.listdir(root)]
- except OSError as e:
- # Directory may have been deleted between finding it in the directory
- # list of its parent and trying to delete its contents. If this
- # happens we treat it as empty. Likewise if the directory was replaced
- # with a file of the same name (less likely, but possible).
- if e.errno in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL):
- return
- else:
- raise
- entries = []
- for p in paths:
- try:
- entry = (p, self.stat(p))
- entries.append(entry)
- yield entry
- except OSError:
- continue
- if self.recursive:
- for path, st in entries:
- try:
- if S_ISDIR(st.st_mode):
- for entry in self.walk(path):
- yield entry
- except (IOError, OSError) as e:
- # IOError for Python 2
- # OSError for Python 3
- # (should be only PermissionError when dropping Python 2 support)
- if e.errno != errno.EACCES:
- raise
- @property
- def paths(self):
- """
- Set of file/directory paths in the snapshot.
- """
- return set(self._stat_info.keys())
- def path(self, id):
- """
- Returns path for id. None if id is unknown to this snapshot.
- """
- return self._inode_to_path.get(id)
- def inode(self, path):
- """ Returns an id for path. """
- st = self._stat_info[path]
- return (st.st_ino, st.st_dev)
- def isdir(self, path):
- return S_ISDIR(self._stat_info[path].st_mode)
- def mtime(self, path):
- return self._stat_info[path].st_mtime
- def size(self, path):
- return self._stat_info[path].st_size
- def stat_info(self, path):
- """
- Returns a stat information object for the specified path from
- the snapshot.
- Attached information is subject to change. Do not use unless
- you specify `stat` in constructor. Use :func:`inode`, :func:`mtime`,
- :func:`isdir` instead.
- :param path:
- The path for which stat information should be obtained
- from a snapshot.
- """
- return self._stat_info[path]
- def __sub__(self, previous_dirsnap):
- """Allow subtracting a DirectorySnapshot object instance from
- another.
- :returns:
- A :class:`DirectorySnapshotDiff` object.
- """
- return DirectorySnapshotDiff(previous_dirsnap, self)
- def __str__(self):
- return self.__repr__()
- def __repr__(self):
- return str(self._stat_info)
|