init: first version

This commit is contained in:
alban 2021-12-03 22:34:34 +01:00
commit 966d4819b4
5 changed files with 326 additions and 0 deletions

141
.gitignore vendored Normal file
View File

@ -0,0 +1,141 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# Pycharm
.idea

2
README.md Normal file
View File

@ -0,0 +1,2 @@
# Proton Director
Work in progress

69
debug.py Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
import os
import socket
import sys
import argparse
'''
AI director debugger
Interactive and dynamic debug user interface
* The debugger server opens a local socket connection for realtime commands
* The debugger server has a many methods available (list, set, etc.)
* The user can activate tags: the debugger will output messages with active flag at reception
* The user can debug objects: the object will output part or all of its internals
Licensed under GNU GPLv3
by alban
'''
valid_commands = {
'list': {
"api": "list_tags",
"help": "List available tags",
"tags": None
},
'add': {
"api": "add_tags",
"help": "Debug the given tag(s)",
"tags": "tags"
},
'del': {
"api": "def_tags",
"help": "Remove from debug the given tag(s)",
"tags": "tags"
},
'help': {
"api": "help",
"help": "Request help",
"tags": None
},
}
socket_addr = "/tmp/._my_debug_socket"
parser = argparse.ArgumentParser(description="AI Director debugger utility")
subparsers = parser.add_subparsers(dest='command', help="API request")
for i in valid_commands:
sub = subparsers.add_parser(i)
item = valid_commands[i]
if item["tags"]:
sub.add_argument("tags", type=str, nargs="+", help=item["help"])
parser.add_argument("--socket", help="Socket location. Default: '{}' ".format(socket_addr), default=socket_addr,
required=False)
args = parser.parse_args()
if args.command not in valid_commands:
raise Exception("Invalid command '{}' not in {}".format(args.command, ", ".join(valid_commands)))
command = valid_commands[args.command]["api"]
tags = " ".join(args.tags) if "tags" in args else ""
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(socket_addr)
s.send(bytes("{} {}".format(command,tags), "utf-8"))
data = s.recv(1024)
print('Received', str(data, "utf-8"))

55
lib/planning.py Normal file
View File

@ -0,0 +1,55 @@
from debuggable import Debuggable
from schedule import Schedule
def check_schedules(schedules):
for schedule in schedules:
if isinstance(schedule, Schedule):
continue
raise Exception("Invalid argument provided: {}".format(schedule))
class Planning(Debuggable):
schedules: list = []
def __init__(self):
super(Planning, self).__init__()
def add(self, schedules):
try:
# check if all are valid schedules
check_schedules(schedules)
# clean the schedules for elapsed
self.clean_schedules()
# add the schedules
self.debug("planning_add_schedules", "Adding {} schedules".format(len(schedules)))
self.schedules.extend(schedules)
self.debug("planning_add_schedules", "Got {} schedules after".format(len(self.schedules)))
# sort
self.schedules.sort(key=lambda x: x.start_at)
except Exception as exc:
print(exc)
def clean_schedules(self):
removed = 0
self.debug("planning_clean_schedules","Got {} schedules before cleaning".format(len(self.schedules)))
schedule_length = len(self.schedules)
for j in range(0,schedule_length):
i = schedule_length - j - 1
schedule = self.schedules[i]
if schedule.elapsed():
del(self.schedules[i])
self.debug("planning_clean_schedules_indexes", "Removed index {}/{}".format(i,schedule_length))
removed += 1
self.debug("planning_clean_schedules","Removed {} schedules".format(removed))
def get_last_schedule(self):
end_at_ordered = self.schedules.copy()
if not end_at_ordered:
return None
end_at_ordered.sort(key=lambda x: x.end_at)
return end_at_ordered.pop() if len(self.schedules) else None
# EOF

59
main.py Executable file
View File

@ -0,0 +1,59 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -*- mode: Python -*-
'''
v0.0.0
AI director
Licensed under GNU GPLv3
by alban
'''
from os import path, getcwd
import sys
abspath, filename = path.split(path.realpath(__file__ ))
sys.path.insert(0, path.join(abspath,"lib"))
import argparse
import asyncio
import atexit
import debugger
from lib.director import Director
from lib.my_plugin import MyPlugin
# from actor_redis import RedisActor
# from sensor_redis import RedisSensor
# from planner_dummy import DummyPlanner
# from planner_duration import DurationPlanner
# from planner_scheduler import SchedulerPlanner
from lib.planner_content import ContentPlanner
debugger.init()
def exit_handler():
print('\n\nLaunch resources cleanup before exit.\n\n')
debugger.instance.cleanup()
atexit.register(exit_handler)
argsparser = argparse.ArgumentParser(description="AI Director main script")
argsparser.add_argument("--tags","-t", nargs='+', help="Default Debug tags", default="default")
args = argsparser.parse_args()
async def main():
director = Director(debug_tags=args.tags)
director.add_plugin(MyPlugin())
# director.add_plugin(audioPlugin())
server = await debugger.instance.server()
async with server:
await asyncio.gather(
server.serve_forever(),
director.run()
)
asyncio.run(main())