[Bf-blender-cvs] [991194f] asset-engine: Add experimental 'asyncio-based' version of Amber.

Bastien Montagne noreply at git.blender.org
Sat Aug 27 15:04:06 CEST 2016


Commit: 991194f49de6b668d2a8cb12d47d5823c693257a
Author: Bastien Montagne
Date:   Sat Aug 27 14:27:44 2016 +0200
Branches: asset-engine
https://developer.blender.org/rB991194f49de6b668d2a8cb12d47d5823c693257a

Add experimental 'asyncio-based' version of Amber.

This was done to both test asyncio usage in an 'asset engine' context, and wet my feet with this lib.

All in all, am really not convinced by asyncio in this use-case. async and coroutines are much more
complex to grasp and get working correctly (and efficiently!) than mere threading.
Further more, most python lib callbacks (os.listdir, os.ls_stat, etc.) are not async-ready,
so you have to call them through asyncio's executor (aka threads or processes),
which means using asyncio in this context is only adding overhead.

I think asyncio is only really relevant when you have to handle tens (and much more) concurrent I/O tasks,
otherwise threading (or multi-processing if compute-intensive) are much, much simpler to get working,
and also probably lighter in terms of overload?

Further more, asyncio paradigm is not so easy to adapt to the 'AssetEngine' paradigm, i.e. a
python interface that gets called to start, query and finalize jobs that are supposed to keep running
in-between (think event loop should run in a background thread actually...).

===================================================================

M	release/scripts/startup/bl_operators/__init__.py
A	release/scripts/startup/bl_operators/amber_asyncio.py

===================================================================

diff --git a/release/scripts/startup/bl_operators/__init__.py b/release/scripts/startup/bl_operators/__init__.py
index 06010f6..1fb7868 100644
--- a/release/scripts/startup/bl_operators/__init__.py
+++ b/release/scripts/startup/bl_operators/__init__.py
@@ -52,6 +52,7 @@ _modules = [
     ]
 
 _modules.append("amber")
+_modules.append("amber_asyncio")
 
 import bpy
 
diff --git a/release/scripts/startup/bl_operators/amber_asyncio.py b/release/scripts/startup/bl_operators/amber_asyncio.py
new file mode 100644
index 0000000..37a28c2
--- /dev/null
+++ b/release/scripts/startup/bl_operators/amber_asyncio.py
@@ -0,0 +1,840 @@
+# ##### BEGIN GPL LICENSE BLOCK #####
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU General Public License
+#  as published by the Free Software Foundation; either version 2
+#  of the License, or (at your option) any later version.
+#
+#  This program is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#  GNU General Public License for more details.
+#
+#  You should have received a copy of the GNU General Public License
+#  along with this program; if not, write to the Free Software Foundation,
+#  Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+#
+# ##### END GPL LICENSE BLOCK #####
+
+# <pep8 compliant>
+
+# Note: This will be a simple addon later, but until it gets to master, it's simpler to have it
+#       as a startup module!
+
+import bpy
+from bpy.types import (
+        AssetEngine,
+        Panel,
+        PropertyGroup,
+        UIList,
+        )
+from bpy.props import (
+        StringProperty,
+        BoolProperty,
+        IntProperty,
+        FloatProperty,
+        EnumProperty,
+        CollectionProperty,
+        )
+
+import binascii
+import concurrent.futures as futures
+import asyncio
+import hashlib
+import json
+import os
+import stat
+import struct
+import time
+import random
+
+
+AMBER_DB_NAME = "__amber_db.json"
+AMBER_DBK_VERSION = "version"
+
+
+##########
+# Helpers.
+
+# Notes about UUIDs:
+#    * UUID of an asset/variant/revision is computed once at its creation! Later changes to data do not affect it.
+#    * Collision, for unlikely it is, may happen across different repositories...
+#      Doubt this will be practical issue though.
+#    * We keep eight first bytes of 'clear' identifier, to (try to) keep some readable uuid.
+
+def _uuid_gen_single(used_uuids, uuid_root, h, str_arg):
+    h.update(str_arg.encode())
+    uuid = uuid_root + h.digest()
+    uuid = uuid[:23].replace(b'\0', b'\1')  # No null chars, RNA 'bytes' use them as in regular strings... :/
+    if uuid not in used_uuids:  # *Very* likely, but...
+        used_uuids.add(uuid)
+        return uuid
+    return None
+
+
+def _uuid_gen(used_uuids, uuid_root, bytes_seed, *str_args):
+    h = hashlib.md5(bytes_seed)
+    for arg in str_args:
+        uuid = _uuid_gen_single(used_uuids, uuid_root, h, arg)
+        if uuid is not None:
+            return uuid
+    # This is a fallback in case we'd get a collision... Should never be needed in real life!
+    for i in range(100000):
+        uuid = _uuid_gen_single(used_uuids, uuid_root, h, i.to_bytes(4, 'little'))
+        if uuid is not None:
+            return uuid
+    return None  # If this happens...
+
+
+def uuid_asset_gen(used_uuids, path_db, name, tags):
+    uuid_root = name.encode()[:8] + b'|'
+    return _uuid_gen_single(used_uuids, uuid_root, path_db.encode(), name, *tags)
+
+
+def uuid_variant_gen(used_uuids, asset_uuid, name):
+    uuid_root = name.encode()[:8] + b'|'
+    return _uuid_gen_single(used_uuids, uuid_root, asset_uuid, name)
+
+
+def uuid_revision_gen(used_uuids, variant_uuid, number, size, time):
+    uuid_root = str(number).encode() + b'|'
+    return _uuid_gen_single(used_uuids, uuid_root, variant_uuid, str(number), str(size), str(timestamp))
+
+
+def uuid_unpack_bytes(uuid_bytes):
+    return struct.unpack("!iiii", uuid_bytes.ljust(16, b'\0'))
+
+
+def uuid_unpack(uuid_hexstr):
+    return uuid_unpack_bytes(binascii.unhexlify(uuid_hexstr))
+
+
+def uuid_unpack_asset(uuid_repo_hexstr, uuid_asset_hexstr):
+    return uuid_unpack_bytes(binascii.unhexlify(uuid_repo_hexstr).ljust(8, b'\0') +
+                             binascii.unhexlify(uuid_asset_hexstr).ljust(8, b'\0'))
+
+
+def uuid_pack(uuid_iv4):
+    print(uuid_iv4)
+    return binascii.hexlify(struct.pack("!iiii", *uuid_iv4))
+
+
+# XXX Hack, once this becomes a real addon we'll just use addons' config system, for now store that in some own config.
+amber_repos_path = os.path.join(bpy.utils.user_resource('CONFIG', create=True), "amber_repos.json")
+amber_repos = None
+if not os.path.exists(amber_repos_path):
+    with open(amber_repos_path, 'w') as ar_f:
+        json.dump({}, ar_f)
+with open(amber_repos_path, 'r') as ar_f:
+    amber_repos = {uuid_unpack(uuid): path for uuid, path in json.load(ar_f).items()}
+assert(amber_repos != None)
+
+
+def save_amber_repos():
+    ar = {uuid_pack(uuid).decode(): path for uuid, path in amber_repos.items()}
+    with open(amber_repos_path, 'w') as ar_f:
+        json.dump(ar, ar_f)
+
+
+#############
+# Amber Jobs.
+class AmberJob:
+    def __init__(self, job_id):
+        loop = asyncio.get_event_loop()
+        self.job_id = job_id
+        self.job_future = loop.create_future()
+        self.status = {'VALID'}
+        self.progress = 0.0
+
+    @staticmethod
+    def async_looper(func):
+        def wrapper(*args, **kwargs):
+            loop = asyncio.get_event_loop()
+            print("kickstop")
+            loop.stop()
+            print("proceed....")
+            func(*args, **kwargs)
+            print("kickstart")
+            loop.run_forever()
+        return wrapper
+
+
+class AmberJobList(AmberJob):
+    @staticmethod
+    async def ls_repo(job_future, db_path):
+        if job_future.cancelled():
+            return None
+
+        loop = asyncio.get_event_loop()
+        repo = None
+        with open(db_path, 'r') as db_f:
+            repo = await loop.run_in_executor(None, json.load, db_f)
+        if isinstance(repo, dict):
+            repo_ver = repo.get(AMBER_DBK_VERSION, "")
+            if repo_ver != "1.0.1":
+                # Unsupported...
+                print("WARNING: unsupported Amber repository version '%s'." % repo_ver)
+                repo = None
+        else:
+            repo = None
+        if repo is not None:
+            # Convert hexa string to array of four uint32...
+            # XXX will have to check endianess mess here, for now always use same one ('network' one).
+            repo_uuid = repo["uuid"]
+            repo["uuid"] = uuid_unpack(repo_uuid)
+            new_entries = {}
+            for euuid, e in repo["entries"].items():
+                new_variants = {}
+                for vuuid, v in e["variants"].items():
+                    new_revisions = {}
+                    for ruuid, r in v["revisions"].items():
+                        new_revisions[uuid_unpack(ruuid)] = r
+                    new_variants[uuid_unpack(vuuid)] = v
+                    v["revisions"] = new_revisions
+                    ruuid = v["revision_default"]
+                    v["revision_default"] = uuid_unpack(ruuid)
+                new_entries[uuid_unpack_asset(repo_uuid, euuid)] = e
+                e["variants"] = new_variants
+                vuuid = e["variant_default"]
+                e["variant_default"] = uuid_unpack(vuuid)
+            repo["entries"] = new_entries
+        #~ print(repo)
+        return repo
+
+    @staticmethod
+    async def ls(job_future, path):
+        if job_future.cancelled():
+            return None
+
+        print(1)
+        loop = asyncio.get_event_loop()
+        repo = None
+        ret = [".."]
+        print(2)
+        tmp = await loop.run_in_executor(None, os.listdir, path)
+        print(3)
+        if AMBER_DB_NAME in tmp:
+            # That dir is an Amber repo, we only list content define by our amber 'db'.
+            repo = await AmberJobList.ls_repo(job_future, os.path.join(path, AMBER_DB_NAME))
+        if repo is None:
+            ret += tmp
+        #~ time.sleep(0.1)  # 100% Artificial Lag (c)
+        print(4)
+        return ret, repo
+
+    @staticmethod
+    async def stat(job_future, root, path):
+        if job_future.cancelled():
+            return None
+
+        loop = asyncio.get_event_loop()
+        st = await loop.run_in_executor(None, os.lstat, root + path)
+        #~ time.sleep(0.1)  # 100% Artificial Lag (c)
+        return path, (stat.S_ISDIR(st.st_mode), st.st_size, st.st_mtime)
+
+    @AmberJob.async_looper
+    def start(self):
+        self.nbr = 0
+        self.tot = 0
+        self.ls_task = asyncio.ensure_future(self.ls(self.job_future, self.root))
+        self.status = {'VALID', 'RUNNING'}
+
+    @AmberJob.async_looper
+    def update(self, repository, dirs):
+        if self.job_future.cancelled():
+            self.cancel()
+            return
+
+        self.status = {'VALID', 'RUNNING'}
+        if self.ls_task is not None:
+            if not self.ls_task.done():
+                return
+            paths, repo = self.ls_task.result()
+            self.ls_task = None
+            self.tot = len(paths)
+            repository.clear()
+            dirs.clear()
+            if repo is not None:
+                repository.update(repo)
+            for p in paths:
+                self.stat_tasks.add(asyncio.ensure_future(self.stat(self.job_future, self.root, p)))
+
+        done = set()
+        for tsk in self.stat_tasks:
+            if tsk.done():
+                path, (is_dir, size, timestamp) = tsk.result()
+                self.nbr += 1
+                if is_dir:
+                    # We only list dirs from real file system.
+                    uuid = uuid_unpack_bytes((path.encode()[:8] + b"|" + self.nbr.to_bytes(4, 'little')))
+                    dirs.append((path, size, timestamp, uuid))
+                done.add(tsk)
+        self.stat_tasks -= done
+
+        self.progress = self.nbr / self.tot
+ 

@@ Diff output truncated at 10240 characters. @@




More information about the Bf-blender-cvs mailing list