# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil; coding: utf-8 -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Helper utility to list things. ATM lists datasets and S3 bucket URLs
"""
__docformat__ = 'restructuredtext'
import humanize
import sys
import string
import time
from os.path import exists, lexists, join as opj, abspath, isabs
from os.path import curdir, isfile, islink, isdir
from os.path import relpath
from os import lstat
from urllib.request import urlopen, Request
from urllib.error import HTTPError
from ..utils import auto_repr
from .base import Interface
from datalad.interface.base import build_doc
from ..ui import ui
from ..utils import Path
from ..utils import safe_print
from ..dochelpers import exc_str
from ..support.param import Parameter
from ..support import ansi_colors
from ..support.constraints import EnsureStr, EnsureNone
from ..distribution.dataset import Dataset
from datalad.support.annexrepo import AnnexRepo
from datalad.support.annexrepo import GitRepo
from datalad.utils import is_interactive
from logging import getLogger
lgr = getLogger('datalad.api.ls')
@build_doc
class Ls(Interface):
"""List summary information about URLs and dataset(s)
ATM only s3:// URLs and datasets are supported
Examples:
$ datalad ls s3://openfmri/tarballs/ds202 # to list S3 bucket
$ datalad ls # to list current dataset
"""
# XXX prevent common args from being added to the docstring
_no_eval_results = True
# TODO: during big RF refactor this one away since it must not be instance's
# attribute. For now introduced to make `datalad ls` be relatively usable
# in terms of speed
_cached_subdatasets = {}
_params_ = dict(
loc=Parameter(
doc="URL or path to list, e.g. s3://...",
metavar='PATH/URL',
nargs="*",
constraints=EnsureStr() | EnsureNone(),
),
recursive=Parameter(
args=("-r", "--recursive"),
action="store_true",
doc="recurse into subdirectories",
),
fast=Parameter(
args=("-F", "--fast"),
action="store_true",
doc="only perform fast operations. Would be overridden by --all",
),
all_=Parameter(
args=("-a", "--all"),
dest='all_',
action="store_true",
doc="list all (versions of) entries, not e.g. only latest entries "
"in case of S3",
),
long_=Parameter(
args=("-L", "--long"),
dest='long_',
action="store_true",
doc="list more information on entries (e.g. acl, urls in s3, annex "
"sizes etc)",
),
config_file=Parameter(
doc="""path to config file which could help the 'ls'. E.g. for s3://
URLs could be some ~/.s3cfg file which would provide credentials""",
constraints=EnsureStr() | EnsureNone()
),
list_content=Parameter(
choices=(None, 'first10', 'md5', 'full'),
doc="""list also the content or only first 10 bytes (first10), or md5
checksum of an entry. Might require expensive transfer and dump
binary output to your screen. Do not enable unless you know what you
are after""",
default=None
),
json=Parameter(
choices=('file', 'display', 'delete'),
doc="""metadata json of dataset for creating web user interface.
display: prints jsons to stdout or
file: writes each subdir metadata to json file in subdir of dataset or
delete: deletes all metadata json files in dataset""",
),
)
@staticmethod
def __call__(loc, recursive=False, fast=False, all_=False, long_=False,
config_file=None, list_content=False, json=None):
if json:
from datalad.interface.ls_webui import _ls_json
if isinstance(loc, list) and not len(loc):
# nothing given, CWD assumed -- just like regular ls
loc = '.'
kw = dict(fast=fast, recursive=recursive, all_=all_, long_=long_)
if isinstance(loc, list):
return [Ls.__call__(loc_, config_file=config_file,
list_content=list_content, json=json, **kw)
for loc_ in loc]
# TODO: do some clever handling of kwargs as to remember what were defaults
# and what any particular implementation actually needs, and then issuing
# warning if some custom value/option was specified which doesn't apply to the
# given url
# rename to not angry Python gods who took all_ good words
kw['long_'] = kw.pop('long_')
loc_type = "unknown"
if loc.startswith('s3://'):
return _ls_s3(loc, config_file=config_file, list_content=list_content,
**kw)
elif lexists(loc):
if isdir(loc):
ds = Dataset(loc)
if ds.is_installed():
return _ls_json(loc, json=json, **kw) if json else _ls_dataset(loc, **kw)
#loc_type = False
else:
loc_type = "dir" # we know that so far for sure
# it might have been an uninstalled dataset within super-dataset
superds = ds.get_superdataset()
if superds:
try:
subdatasets = Ls._cached_subdatasets[superds.path]
except KeyError:
subdatasets = Ls._cached_subdatasets[superds.path] \
= superds.subdatasets(result_xfm='relpaths')
if relpath(ds.path, superds.path) in subdatasets:
loc_type = "not installed"
else:
loc_type = "file"
# could list properties -- under annex or git, either clean/dirty
# etc
# repo = get_repo_instance(dirname(loc))
if loc_type:
#raise ValueError("ATM supporting only s3:// URLs and paths to local datasets")
# TODO: unify all_ the output here -- _ls functions should just return something
# to be displayed
ui.message(
"{} {}".format(
ansi_colors.color_word(loc, ansi_colors.DATASET),
ansi_colors.color_word(
loc_type,
ansi_colors.RED
if loc_type in {'unknown', 'not installed'}
else ansi_colors.BLUE)
)
)
#
# Dataset listing
#
@auto_repr
class AbsentRepoModel(object):
"""Just a base for those where repo wasn't installed yet"""
def __init__(self, path):
self.path = path
self.repo = None
@property
def type(self):
return "N/A"
@auto_repr
class GitModel(object):
"""A base class for models which have some .repo available"""
__slots__ = ['_branch', 'repo', '_path']
def __init__(self, repo):
self.repo = repo
# lazy evaluation variables
self._branch = None
self._path = None
@property
def path(self):
return self.repo.path if self._path is None else self._path
@path.setter
def path(self, v):
self._path = v
@property
def branch(self):
if self._branch is None:
try:
self._branch = self.repo.get_active_branch()
except: # MIH: InvalidGitRepositoryError?
return None
return self._branch
@property
def clean(self):
return not self.repo.dirty
@property
def describe(self):
return self.repo.describe(tags=True)
@property
def date(self):
"""Date of the last commit
"""
return self.repo.get_commit_date()
@property
def count_objects(self):
return self.repo.count_objects
@property
def git_local_size(self):
count_objects = self.count_objects
return count_objects['size'] if count_objects else None
@property
def type(self):
return {False: 'git', True: 'annex'}[isinstance(self.repo, AnnexRepo)]
@auto_repr
class AnnexModel(GitModel):
__slots__ = ['_info'] + GitModel.__slots__
def __init__(self, *args, **kwargs):
super(AnnexModel, self).__init__(*args, **kwargs)
self._info = None
@property
def info(self):
if self._info is None and self.type == 'annex':
# we do not care about descriptions - just about sizes etc,
# so to allow RO mode operation - disallow git-annex branch
# merges
self._info = self.repo.repo_info(merge_annex_branches=False)
return self._info
@property
def annex_worktree_size(self):
info = self.info
return info['size of annexed files in working tree'] if info else 0.0
@property
def annex_local_size(self):
info = self.info
return info['local annex size'] if info else 0.0
@auto_repr
class FsModel(AnnexModel):
__slots__ = AnnexModel.__slots__
def __init__(self, path, *args, **kwargs):
super(FsModel, self).__init__(*args, **kwargs)
self._path = Path(path)
@property
def path(self):
return str(self._path)
@property
def symlink(self):
"""if symlink returns path the symlink points to else returns None"""
if self._path.is_symlink(): # if symlink
target_path = self._path.resolve() # find link target
# convert to absolute path if not
return str(target_path) if target_path.exists() else None
return None
@property
def date(self):
"""Date of last modification"""
if self.type_ is not ['git', 'annex']:
return self._path.lstat().st_mtime
else:
return super(self.__class__, self).date
@property
def size(self):
"""Size of the node computed based on its type"""
type_ = self.type_
sizes = {'total': 0.0,
'ondisk': 0.0,
'git': 0.0,
'annex': 0.0,
'annex_worktree': 0.0}
if type_ in ['file', 'link', 'link-broken']:
# if node is under annex, ask annex for node size, ondisk_size
if isinstance(self.repo, AnnexRepo) and self.repo.is_under_annex(str(self._path)):
size = self.repo.info(str(self._path), batch=True)['size']
ondisk_size = size \
if self.repo.file_has_content(str(self._path)) \
else 0
# else ask fs for node size (= ondisk_size)
else:
size = ondisk_size = 0 \
if type_ == 'link-broken' \
else lstat(self.symlink or str(self._path)).st_size
sizes.update({'total': size, 'ondisk': ondisk_size})
if self.repo.pathobj == self._path:
sizes.update({'git': self.git_local_size,
'annex': self.annex_local_size,
'annex_worktree': self.annex_worktree_size})
return sizes
@property
def type_(self):
"""outputs the node type
Types: link, link-broken, file, dir, annex-repo, git-repo"""
if islink(self.path):
return 'link' if self.symlink else 'link-broken'
elif isfile(self.path):
return 'file'
elif exists(opj(self.path, ".git", "annex")):
return 'annex'
elif exists(opj(self.path, ".git")):
return 'git'
elif isdir(self.path):
return 'dir'
else:
return None
class LsFormatter(string.Formatter):
# TODO: we might want to just ignore and force utf8 while explicitly .encode()'ing output!
# unicode versions which look better but which blow during tests etc
# Those might be reset by the constructor
OK = 'OK' # u"✓"
NOK = 'X' # u"✗"
NONE = '-' # u"✗"
def __init__(self, *args, **kwargs):
super(LsFormatter, self).__init__(*args, **kwargs)
if sys.stdout.encoding is None:
lgr.debug("encoding not set, using safe alternatives")
elif not sys.stdout.isatty():
lgr.debug("stdout is not a tty, using safe alternatives")
else:
try:
u"✓".encode(sys.stdout.encoding)
except UnicodeEncodeError:
lgr.debug("encoding %s does not support unicode, "
"using safe alternatives",
sys.stdout.encoding)
else:
self.OK = u"✓"
self.NOK = u"✗"
self.NONE = u"✗"
def convert_field(self, value, conversion):
#print("%r->%r" % (value, conversion))
if conversion == 'D': # Date
if value is not None:
return time.strftime(u"%Y-%m-%d/%H:%M:%S", time.localtime(value))
else:
return u'-'
elif conversion == 'S': # Human size
#return value
if value is not None:
return humanize.naturalsize(value)
else:
return u'-'
elif conversion == 'X': # colored bool
if value:
mark, col = self.OK, ansi_colors.GREEN
else:
mark, col = self.NOK, ansi_colors.RED
return ansi_colors.color_word(mark, col)
elif conversion == 'N': # colored Red - if None
if value is None:
# return "%s✖%s" % (self.RED, self.RESET)
return ansi_colors.color_word(self.NONE, ansi_colors.RED)
return value
elif conversion in {'B', 'R', 'U'}:
return ansi_colors.color_word(
value,
{'B': ansi_colors.BLUE,
'R': ansi_colors.RED,
'U': ansi_colors.DATASET}[conversion])
return super(LsFormatter, self).convert_field(value, conversion)
def format_field(self, value, format_spec):
# TODO: move all the "coloring" into formatting, so we could correctly indent
# given the format and only then color it up
# print "> %r, %r" % (value, format_spec)
return super(LsFormatter, self).format_field(value, format_spec)
def format_ds_model(formatter, ds_model, format_str, format_exc):
try:
#print("WORKING ON %s" % ds_model.path)
if not exists(ds_model.path) or not ds_model.repo:
return formatter.format(format_exc, ds=ds_model, msg=u"not installed")
ds_formatted = formatter.format(format_str, ds=ds_model)
#print("FINISHED ON %s" % ds_model.path)
return ds_formatted
except Exception as exc:
return formatter.format(format_exc, ds=ds_model, msg=exc_str(exc))
# from joblib import Parallel, delayed
def _ls_dataset(loc, fast=False, recursive=False, all_=False, long_=False):
isabs_loc = isabs(loc)
topdir = '' if isabs_loc else abspath(curdir)
topds = Dataset(loc)
dss = [topds] + (
[Dataset(opj(loc, sm))
for sm in topds.subdatasets(recursive=recursive, result_xfm='relpaths')]
if recursive else [])
dsms = []
for ds in dss:
if not ds.is_installed():
dsm = AbsentRepoModel(ds.path)
elif isinstance(ds.repo, AnnexRepo):
dsm = AnnexModel(ds.repo)
elif isinstance(ds.repo, GitRepo):
dsm = GitModel(ds.repo)
else:
raise RuntimeError("Got some dataset which don't know how to handle %s"
% ds)
dsms.append(dsm)
# adjust path strings
for ds_model in dsms:
#path = ds_model.path[len(topdir) + 1 if topdir else 0:]
path = relpath(ds_model.path, topdir) if topdir else ds_model.path
if not path:
path = '.'
ds_model.path = path
dsms = sorted(dsms, key=lambda m: m.path)
maxpath = max(len(ds_model.path) for ds_model in dsms)
path_fmt = u"{ds.path!U:<%d}" % (maxpath + (11 if is_interactive() else 0)) # + to accommodate ansi codes
pathtype_fmt = path_fmt + u" [{ds.type}]"
full_fmt = pathtype_fmt + u" {ds.branch!N} {ds.describe!N} {ds.date!D}"
if (not fast) or long_:
full_fmt += u" {ds.clean!X}"
fmts = {
AbsentRepoModel: pathtype_fmt,
GitModel: full_fmt,
AnnexModel: full_fmt
}
if long_:
fmts[AnnexModel] += u" {ds.annex_local_size!S}/{ds.annex_worktree_size!S}"
formatter = LsFormatter()
# weird problems happen in the parallel run -- TODO - figure it out
# for out in Parallel(n_jobs=1)(
# delayed(format_ds_model)(formatter, dsm, full_fmt, format_exc=path_fmt + " {msg!R}")
# for dsm in dss):
# print(out)
for dsm in dsms:
fmt = fmts[dsm.__class__]
ds_str = format_ds_model(formatter, dsm, fmt, format_exc=path_fmt + u" {msg!R}")
safe_print(ds_str)
# workaround for explosion of git cat-file --batch processes
# https://github.com/datalad/datalad/issues/1888
if dsm.repo is not None:
del dsm.repo
dsm.repo = None
#
# S3 listing
#
def _ls_s3(loc, fast=False, recursive=False, all_=False, long_=False,
config_file=None, list_content=False):
"""List S3 bucket content"""
if loc.startswith('s3://'):
bucket_prefix = loc[5:]
else:
raise ValueError("passed location should be an s3:// url")
import boto
from hashlib import md5
from boto.s3.key import Key
from boto.s3.prefix import Prefix
from boto.s3.connection import OrdinaryCallingFormat
from boto.exception import S3ResponseError
from ..support.configparserinc import SafeConfigParser # provides PY2,3 imports
if '/' in bucket_prefix:
bucket_name, prefix = bucket_prefix.split('/', 1)
else:
bucket_name, prefix = bucket_prefix, None
if prefix and '?' in prefix:
ui.message("We do not care about URL options ATM, they get stripped")
prefix = prefix[:prefix.index('?')]
ui.message("Connecting to bucket: %s" % bucket_name)
if config_file:
config = SafeConfigParser()
config.read(config_file)
access_key = config.get('default', 'access_key')
secret_key = config.get('default', 'secret_key')
# TODO: remove duplication -- reuse logic within downloaders/s3.py to get connected
kwargs = {}
if '.' in bucket_name:
kwargs['calling_format']=OrdinaryCallingFormat()
conn = boto.connect_s3(access_key, secret_key, **kwargs)
try:
bucket = conn.get_bucket(bucket_name)
except S3ResponseError as e:
ui.message("E: Cannot access bucket %s by name" % bucket_name)
all_buckets = conn.get_all_buckets()
all_bucket_names = [b.name for b in all_buckets]
ui.message("I: Found following buckets %s" % ', '.join(all_bucket_names))
if bucket_name in all_bucket_names:
bucket = all_buckets[all_bucket_names.index(bucket_name)]
else:
raise RuntimeError("E: no bucket named %s thus exiting" % bucket_name)
else:
# TODO: expose credentials
# We don't need any provider here really but only credentials
from datalad.downloaders.providers import Providers
providers = Providers.from_config_files()
provider = providers.get_provider(loc)
if not provider:
raise ValueError(
"Don't know how to deal with this url %s -- no provider defined for %s. "
"Define a new provider (DOCS: TODO) or specify just s3cmd config file instead for now."
% loc
)
downloader = provider.get_downloader(loc)
# should authenticate etc, and when ready we will ask for a bucket ;)
bucket = downloader.access(lambda url: downloader.bucket, loc)
info = []
for iname, imeth in [
("Versioning", bucket.get_versioning_status),
(" Website", bucket.get_website_endpoint),
(" ACL", bucket.get_acl),
]:
try:
ival = imeth()
except Exception as e:
ival = str(e).split('\n')[0]
info.append(" {iname}: {ival}".format(**locals()))
ui.message("Bucket info:\n %s" % '\n '.join(info))
kwargs = {} if recursive else {'delimiter': '/'}
ACCESS_METHODS = [
bucket.list_versions,
bucket.list
]
prefix_all_versions = None
got_versioned_list = False
for acc in ACCESS_METHODS:
try:
prefix_all_versions = list(acc(prefix, **kwargs))
got_versioned_list = acc is bucket.list_versions
break
except Exception as exc:
lgr.debug("Failed to access via %s: %s", acc, exc_str(exc))
if not prefix_all_versions:
ui.error("No output was provided for prefix %r" % prefix)
else:
max_length = max((len(e.name) for e in prefix_all_versions))
max_size_length = max((len(str(getattr(e, 'size', 0))) for e in prefix_all_versions))
results = []
for e in prefix_all_versions:
results.append(e)
if isinstance(e, Prefix):
ui.message("%s" % (e.name, ),)
continue
base_msg = ("%%-%ds %%s" % max_length) % (e.name, e.last_modified)
if isinstance(e, Key):
if got_versioned_list and not (e.is_latest or all_):
lgr.debug(
"Skipping Key since not all versions requested: %s", e)
# Skip this one
continue
ui.message(base_msg + " %%%dd" % max_size_length % e.size, cr=' ')
# OPT: delayed import
from ..support.s3 import get_key_url
url = get_key_url(e, schema='http')
try:
_ = urlopen(Request(url))
urlok = "OK"
except HTTPError as err:
urlok = "E: %s" % err.code
try:
acl = e.get_acl()
except S3ResponseError as exc:
acl = exc.code if exc.code in ('AccessDenied',) else str(exc)
content = ""
if list_content:
# IO intensive, make an option finally!
try:
# _ = e.next()[:5] if we are able to fetch the content
kwargs = dict(version_id=e.version_id)
if list_content in {'full', 'first10'}:
if list_content in 'first10':
kwargs['headers'] = {'Range': 'bytes=0-9'}
content = repr(e.get_contents_as_string(**kwargs))
elif list_content == 'md5':
digest = md5()
digest.update(e.get_contents_as_string(**kwargs))
content = digest.hexdigest()
else:
raise ValueError(list_content)
# content = "[S3: OK]"
except S3ResponseError as err:
content = str(err)
finally:
content = " " + content
ui.message(
"ver:%-32s acl:%s %s [%s]%s"
% (getattr(e, 'version_id', None),
acl, url, urlok, content)
if long_ else ''
)
else:
ui.message(base_msg + " " + str(type(e)).split('.')[-1].rstrip("\"'>"))
return results