[Bf-blender-cvs] [0da8bae] asset-engine: Claude: Fix asyncio handling!

Bastien Montagne noreply at git.blender.org
Tue Sep 6 14:44:06 CEST 2016


Commit: 0da8bae85ac41a949048437f0f53ea705584a43f
Author: Bastien Montagne
Date:   Tue Sep 6 14:38:51 2016 +0200
Branches: asset-engine
https://developer.blender.org/rB0da8bae85ac41a949048437f0f53ea705584a43f

Claude: Fix asyncio handling!

Turns out using worker thread to run asyncio loop is not working with pillar, this code is totally
not blender-independent and hence cannot safely work in another thread.

So instead using same technique as in AmberAIO - 'stepping' of asyncio loop, instead of blindly
run_forever() and hoping loop will stop some day...

Note that we may rather call e.g. loop.run_later(1e-5, loop.stop), to actually give a bit more
time to the loop... Time will say what's best option here.

===================================================================

M	release/scripts/startup/bl_operators/claude/__init__.py

===================================================================

diff --git a/release/scripts/startup/bl_operators/claude/__init__.py b/release/scripts/startup/bl_operators/claude/__init__.py
index 17ab18c..20f486d 100644
--- a/release/scripts/startup/bl_operators/claude/__init__.py
+++ b/release/scripts/startup/bl_operators/claude/__init__.py
@@ -79,41 +79,37 @@ REQUIRED_ROLES_FOR_CLAUDE = {'subscriber', 'demo'}
 # Claude Jobs.
 class ClaudeJob:
     @staticmethod
-    async def check_loop(loop, evt_cancel):
-        """Async function merely endlessly checking 'cancel' flag is not set, and stopping the loop in case it is..."""
-        while True:
-            if evt_cancel.is_set():
-                print("CAAANNNNNNCCCEEEEEEEELLLLLEDDDDDDDD!")
-                loop.stop()
-                return
-            await asyncio.sleep(1e-6)
-
-    @classmethod
-    def run_loop(cls, loop, evt_cancel):
-        """Function called in worker thread, sets the loop for current thread, and runs it."""
-        asyncio.set_event_loop(loop)
-        asyncio.ensure_future(cls.check_loop(loop, evt_cancel))
-        loop.run_forever()
-        loop.close()
+    def async_looper(func):
+        def wrapper(self, *args, **kwargs):
+            loop = self.loop
+            assert(not self.loop.is_running())
+            print("proceed....")
+            ret = func(self, *args, **kwargs)
+            if not self.evt_cancel.is_set():
+                print("kickstep")
+                # This forces loop to only do 'one step', and return (hopefully!) very soon.
+                loop.call_soon(loop.stop)
+                loop.run_forever()
+            else:
+                print("canceled")
+            return ret
+        return wrapper
 
     def cancel(self):
+        print("Cancelling ", self)
         self.evt_cancel.set()
-        self.running_loop.cancel()
-        futures.wait((self.running_loop,))
-
-    def __init__(self, executor, job_id):
-        self.executor = executor
+        assert(not self.loop.is_running())
 
+    def __init__(self, job_id):
         self.job_id = job_id
         self.status = {'VALID'}
         self.progress = 0.0
 
-        self.loop = asyncio.new_event_loop()
-        #~ self.loop.set_default_executor(executor)
         self.evt_cancel = threading.Event()
-        self.running_loop = self.executor.submit(self.run_loop, self.loop, self.evt_cancel)
+        self.loop = asyncio.get_event_loop()
 
     def __del__(self):
+        print("deleting ", self)
         self.cancel()
 
 class ClaudeJobCheckCredentials(ClaudeJob):
@@ -124,7 +120,9 @@ class ClaudeJobCheckCredentials(ClaudeJob):
         Returns None if the user cannot be found, or if the user is not a Cloud subscriber.
         """
         try:
+            print("Awaiting pillar.check_pillar_credentials...")
             user_id = await pillar.check_pillar_credentials(REQUIRED_ROLES_FOR_CLAUDE)
+            print("Done pillar.check_pillar_credentials...")
         except pillar.NotSubscribedToCloudError:
             print('Not subsribed.')
             return None
@@ -137,7 +135,9 @@ class ClaudeJobCheckCredentials(ClaudeJob):
             return user_id
 
         try:
+            print("awaiting pillar.refresh_pillar_credentials...")
             user_id = await pillar.refresh_pillar_credentials(required_roles)
+            print("Done pillar.refresh_pillar_credentials...")
         except pillar.NotSubscribedToCloudError:
             print('Not subsribed.')
             return None
@@ -151,22 +151,21 @@ class ClaudeJobCheckCredentials(ClaudeJob):
 
         return None
 
+    @ClaudeJob.async_looper
     def start(self):
-        self.check_task = asyncio.run_coroutine_threadsafe(self.check(), self.loop)
+        self.check_task = asyncio.ensure_future(self.check())
         self.progress = 0.0
         self.status = {'VALID', 'RUNNING'}
 
+    @ClaudeJob.async_looper
     def update(self):
-        if self.evt_cancel.is_set():
-            self.cancel()
-            return
-
         self.status = {'VALID', 'RUNNING'}
         user_id = ...
+        print("Updating credential status, ", self.check_task, " | Done: ", self.check_task.done() if self.check_task else "")
         if self.check_task is not None:
             if not self.check_task.done():
                 return ...
-            print("cred check finished, we should have cloud access...")
+            print("Cred check finished, we should have cloud access...")
             user_id = self.check_task.result()
             print(user_id)
             self.check_task = None
@@ -177,14 +176,11 @@ class ClaudeJobCheckCredentials(ClaudeJob):
         return user_id
 
     def cancel(self):
-        print("CANCELLING...")
         super().cancel()
-        if self.check_task is not None and not self.check_task.done():
-            self.check_task.cancel()
         self.status = {'VALID'}
 
-    def __init__(self, executor, job_id):
-        super().__init__(executor, job_id)
+    def __init__(self, job_id):
+        super().__init__(job_id)
 
         self.check_task = None
 
@@ -193,26 +189,21 @@ class ClaudeJobCheckCredentials(ClaudeJob):
 
 class ClaudeJobList(ClaudeJob):
     @staticmethod
-    async def ls(evt_cancel, node):
-        if evt_cancel.is_set():
-            return None
-
+    async def ls(node):
         print("we should be listing Cloud content from Node ", node, "...")
         await asyncio.sleep(1)
 
         return ["..", "a", "b"]
 
+    @ClaudeJob.async_looper
     def start(self):
         self.nbr = 0
         self.tot = 0
-        self.ls_task = asyncio.run_coroutine_threadsafe(self.ls(self.evt_cancel, self.curr_node), self.loop)
+        self.ls_task = asyncio.ensure_future(self.ls(self.evt_cancel, self.curr_node))
         self.status = {'VALID', 'RUNNING'}
 
+    @ClaudeJob.async_looper
     def update(self, user_id, dirs):
-        if self.evt_cancel.is_set():
-            self.cancel()
-            return
-
         self.status = {'VALID', 'RUNNING'}
 
         if user_id is None:
@@ -240,12 +231,10 @@ class ClaudeJobList(ClaudeJob):
     def cancel(self):
         print("CANCELLING...")
         super().cancel()
-        if self.ls_task is not None and not self.ls_task.done():
-            self.ls_task.cancel()
         self.status = {'VALID'}
 
-    def __init__(self, executor, job_id, user_id, curr_node):
-        super().__init__(executor, job_id)
+    def __init__(self, job_id, user_id, curr_node):
+        super().__init__(job_id)
         self.curr_node = curr_node
 
         self.ls_task = None
@@ -324,7 +313,6 @@ class AssetEngineClaude(AssetEngine):
     bl_version = (0 << 16) + (0 << 8) + 1  # Usual maj.min.rev version scheme...
 
     def __init__(self):
-        self.executor = futures.ThreadPoolExecutor(8)  # Using threads for now, if issues arise we'll switch to process.
         self.jobs = {}
 
         self.job_uuid = 1
@@ -358,10 +346,10 @@ class AssetEngineClaude(AssetEngine):
         if self.job_id_check_credentials is ...:
             self.job_id_check_credentials = job_id = self.job_uuid
             self.job_uuid += 1
-            job = self.jobs[job_id] = ClaudeJobCheckCredentials(self.executor, job_id)
+            job = self.jobs[job_id] = ClaudeJobCheckCredentials(job_id)
         else:
-            print(self.job_id_check_credentials)
             job = self.jobs[self.job_id_check_credentials]
+        print("main check_credentials: ", job)
         if job is not None:
             self.user_id = job.update()
             if self.user_id is not ...:
@@ -450,7 +438,7 @@ class AssetEngineClaude(AssetEngine):
         if job is not None and isinstance(job, ClaudeJobList):
             if job.curr_node != entries.root_path:
                 self.reset()
-                self.jobs[job_id] = ClaudeJobList(self.executor, job_id, user_id, entries.root_path)
+                job = self.jobs[job_id] = ClaudeJobList(self.executor, job_id, user_id, entries.root_path)
                 self.root = entries.root_path
         elif self.root != entries.root_path:
             self.reset()




More information about the Bf-blender-cvs mailing list