proton-director/lib/director.py

74 lines
2.4 KiB
Python

# -*- coding: utf-8 -*-
# -*- mode: Python -*-
import asyncio
import inspect
import re
from debuggable import Debuggable
from plan import Plan
from blackboard import Blackboard
from schedule import Schedule
from planning import Planning
class Director(Debuggable):
def __init__(self, debug_tags=[]):
super(Director, self).__init__()
if len(debug_tags):
self._debugger.add_selected_tags(debug_tags)
self.plugins = {}
self.sensors = []
self.planners = []
self.communicators = []
self.blackboard = {} # Blackboard()
self.planning = Planning()
async def tick(self):
self.on_perceive()
self.on_plan()
self.on_communicate()
await asyncio.sleep(0.1)
def on_perceive(self):
"""Sensors are required to update the blackboard
"""
# Pass the World State to all sensors
for obj, meth in self.sensors:
getattr(self.plugins[obj], meth)(blackboard=self.blackboard)
def on_plan(self):
"""Planifiers are called
"""
for obj, meth in self.planners:
getattr(self.plugins[obj], meth)(blackboard=self.blackboard, planning=self.planning)
def on_communicate(self):
"""Communicators are called
"""
for obj, meth in self.communicators:
getattr(self.plugins[obj], meth)(planning=self.planning)
async def run(self):
while 1:
await self.tick()
def add_plugin(self, plugin):
if not hasattr(plugin, "name"):
raise Exception("No property 'name' in plugin '{}'".format(plugin))
plugin_name = plugin.name
self.plugins[plugin_name] = plugin
for method_name, prop in inspect.getmembers(plugin):
if re.match("^(sensor.*)", method_name):
self.sensors.append((plugin_name, method_name))
self.debug("config", "Added sensor {}::{}".format(plugin_name, method_name))
elif re.match("^(planner.*)", method_name):
self.planners.append((plugin_name, method_name))
self.debug("config", "Added planner {}::{}".format(plugin_name, method_name))
elif re.match("^(communicator.*)", method_name):
self.communicators.append((plugin_name, method_name))
self.debug("config", "Added communicator {}::{}".format(plugin_name, method_name))