[Bf-blender-cvs] [0da8bae] asset-engine: Claude: Fix asyncio handling!
Bastien Montagne
noreply at git.blender.org
Tue Sep 6 14:44:06 CEST 2016
Commit: 0da8bae85ac41a949048437f0f53ea705584a43f
Author: Bastien Montagne
Date: Tue Sep 6 14:38:51 2016 +0200
Branches: asset-engine
https://developer.blender.org/rB0da8bae85ac41a949048437f0f53ea705584a43f
Claude: Fix asyncio handling!
Turns out using worker thread to run asyncio loop is not working with pillar, this code is totally
not blender-independent and hence cannot safely work in another thread.
So instead using same technique as in AmberAIO - 'stepping' of asyncio loop, instead of blindly
run_forever() and hoping loop will stop some day...
Note that we may rather call e.g. loop.run_later(1e-5, loop.stop), to actually give a bit more
time to the loop... Time will say what's best option here.
===================================================================
M release/scripts/startup/bl_operators/claude/__init__.py
===================================================================
diff --git a/release/scripts/startup/bl_operators/claude/__init__.py b/release/scripts/startup/bl_operators/claude/__init__.py
index 17ab18c..20f486d 100644
--- a/release/scripts/startup/bl_operators/claude/__init__.py
+++ b/release/scripts/startup/bl_operators/claude/__init__.py
@@ -79,41 +79,37 @@ REQUIRED_ROLES_FOR_CLAUDE = {'subscriber', 'demo'}
# Claude Jobs.
class ClaudeJob:
@staticmethod
- async def check_loop(loop, evt_cancel):
- """Async function merely endlessly checking 'cancel' flag is not set, and stopping the loop in case it is..."""
- while True:
- if evt_cancel.is_set():
- print("CAAANNNNNNCCCEEEEEEEELLLLLEDDDDDDDD!")
- loop.stop()
- return
- await asyncio.sleep(1e-6)
-
- @classmethod
- def run_loop(cls, loop, evt_cancel):
- """Function called in worker thread, sets the loop for current thread, and runs it."""
- asyncio.set_event_loop(loop)
- asyncio.ensure_future(cls.check_loop(loop, evt_cancel))
- loop.run_forever()
- loop.close()
+ def async_looper(func):
+ def wrapper(self, *args, **kwargs):
+ loop = self.loop
+ assert(not self.loop.is_running())
+ print("proceed....")
+ ret = func(self, *args, **kwargs)
+ if not self.evt_cancel.is_set():
+ print("kickstep")
+ # This forces loop to only do 'one step', and return (hopefully!) very soon.
+ loop.call_soon(loop.stop)
+ loop.run_forever()
+ else:
+ print("canceled")
+ return ret
+ return wrapper
def cancel(self):
+ print("Cancelling ", self)
self.evt_cancel.set()
- self.running_loop.cancel()
- futures.wait((self.running_loop,))
-
- def __init__(self, executor, job_id):
- self.executor = executor
+ assert(not self.loop.is_running())
+ def __init__(self, job_id):
self.job_id = job_id
self.status = {'VALID'}
self.progress = 0.0
- self.loop = asyncio.new_event_loop()
- #~ self.loop.set_default_executor(executor)
self.evt_cancel = threading.Event()
- self.running_loop = self.executor.submit(self.run_loop, self.loop, self.evt_cancel)
+ self.loop = asyncio.get_event_loop()
def __del__(self):
+ print("deleting ", self)
self.cancel()
class ClaudeJobCheckCredentials(ClaudeJob):
@@ -124,7 +120,9 @@ class ClaudeJobCheckCredentials(ClaudeJob):
Returns None if the user cannot be found, or if the user is not a Cloud subscriber.
"""
try:
+ print("Awaiting pillar.check_pillar_credentials...")
user_id = await pillar.check_pillar_credentials(REQUIRED_ROLES_FOR_CLAUDE)
+ print("Done pillar.check_pillar_credentials...")
except pillar.NotSubscribedToCloudError:
print('Not subsribed.')
return None
@@ -137,7 +135,9 @@ class ClaudeJobCheckCredentials(ClaudeJob):
return user_id
try:
+ print("awaiting pillar.refresh_pillar_credentials...")
user_id = await pillar.refresh_pillar_credentials(required_roles)
+ print("Done pillar.refresh_pillar_credentials...")
except pillar.NotSubscribedToCloudError:
print('Not subsribed.')
return None
@@ -151,22 +151,21 @@ class ClaudeJobCheckCredentials(ClaudeJob):
return None
+ @ClaudeJob.async_looper
def start(self):
- self.check_task = asyncio.run_coroutine_threadsafe(self.check(), self.loop)
+ self.check_task = asyncio.ensure_future(self.check())
self.progress = 0.0
self.status = {'VALID', 'RUNNING'}
+ @ClaudeJob.async_looper
def update(self):
- if self.evt_cancel.is_set():
- self.cancel()
- return
-
self.status = {'VALID', 'RUNNING'}
user_id = ...
+ print("Updating credential status, ", self.check_task, " | Done: ", self.check_task.done() if self.check_task else "")
if self.check_task is not None:
if not self.check_task.done():
return ...
- print("cred check finished, we should have cloud access...")
+ print("Cred check finished, we should have cloud access...")
user_id = self.check_task.result()
print(user_id)
self.check_task = None
@@ -177,14 +176,11 @@ class ClaudeJobCheckCredentials(ClaudeJob):
return user_id
def cancel(self):
- print("CANCELLING...")
super().cancel()
- if self.check_task is not None and not self.check_task.done():
- self.check_task.cancel()
self.status = {'VALID'}
- def __init__(self, executor, job_id):
- super().__init__(executor, job_id)
+ def __init__(self, job_id):
+ super().__init__(job_id)
self.check_task = None
@@ -193,26 +189,21 @@ class ClaudeJobCheckCredentials(ClaudeJob):
class ClaudeJobList(ClaudeJob):
@staticmethod
- async def ls(evt_cancel, node):
- if evt_cancel.is_set():
- return None
-
+ async def ls(node):
print("we should be listing Cloud content from Node ", node, "...")
await asyncio.sleep(1)
return ["..", "a", "b"]
+ @ClaudeJob.async_looper
def start(self):
self.nbr = 0
self.tot = 0
- self.ls_task = asyncio.run_coroutine_threadsafe(self.ls(self.evt_cancel, self.curr_node), self.loop)
+ self.ls_task = asyncio.ensure_future(self.ls(self.evt_cancel, self.curr_node))
self.status = {'VALID', 'RUNNING'}
+ @ClaudeJob.async_looper
def update(self, user_id, dirs):
- if self.evt_cancel.is_set():
- self.cancel()
- return
-
self.status = {'VALID', 'RUNNING'}
if user_id is None:
@@ -240,12 +231,10 @@ class ClaudeJobList(ClaudeJob):
def cancel(self):
print("CANCELLING...")
super().cancel()
- if self.ls_task is not None and not self.ls_task.done():
- self.ls_task.cancel()
self.status = {'VALID'}
- def __init__(self, executor, job_id, user_id, curr_node):
- super().__init__(executor, job_id)
+ def __init__(self, job_id, user_id, curr_node):
+ super().__init__(job_id)
self.curr_node = curr_node
self.ls_task = None
@@ -324,7 +313,6 @@ class AssetEngineClaude(AssetEngine):
bl_version = (0 << 16) + (0 << 8) + 1 # Usual maj.min.rev version scheme...
def __init__(self):
- self.executor = futures.ThreadPoolExecutor(8) # Using threads for now, if issues arise we'll switch to process.
self.jobs = {}
self.job_uuid = 1
@@ -358,10 +346,10 @@ class AssetEngineClaude(AssetEngine):
if self.job_id_check_credentials is ...:
self.job_id_check_credentials = job_id = self.job_uuid
self.job_uuid += 1
- job = self.jobs[job_id] = ClaudeJobCheckCredentials(self.executor, job_id)
+ job = self.jobs[job_id] = ClaudeJobCheckCredentials(job_id)
else:
- print(self.job_id_check_credentials)
job = self.jobs[self.job_id_check_credentials]
+ print("main check_credentials: ", job)
if job is not None:
self.user_id = job.update()
if self.user_id is not ...:
@@ -450,7 +438,7 @@ class AssetEngineClaude(AssetEngine):
if job is not None and isinstance(job, ClaudeJobList):
if job.curr_node != entries.root_path:
self.reset()
- self.jobs[job_id] = ClaudeJobList(self.executor, job_id, user_id, entries.root_path)
+ job = self.jobs[job_id] = ClaudeJobList(self.executor, job_id, user_id, entries.root_path)
self.root = entries.root_path
elif self.root != entries.root_path:
self.reset()
More information about the Bf-blender-cvs
mailing list