proton-director/lib/my_plugin.py

122 lines
3.5 KiB
Python

from debuggable import Debuggable
from random import randint, choice
from math import sqrt
from datetime import datetime, timedelta
from planning import Planning
from schedule import Schedule
def gen(N, n):
""" returns a list of integer numbers between 1 and n with exhaustion level N """
res = []
i = 0
while N > 1:
r = randint(1, n)
N /= r
res.append(r)
i += 1
return res
def abba(lst, item):
return [item, *lst, item]
def abab(lst, item):
return [item, *lst, item, *lst]
def baa(lst, item):
return [*lst, item, item]
def ab(lst, item):
return [item, *lst]
def ba(lst, item):
return [*lst, item]
def struct(lst):
fnList = ['abba', 'abab', 'baa', 'ab', 'ba']
ret = []
for i, w in enumerate(lst):
fn_name = choice(fnList)
fn = globals().copy().get(fn_name)
ret = fn(ret, {'id': i, 'weight': w})
return ret
class MyPlugin(Debuggable):
"""Planner for general slots allocations"""
name = "myPlugin"
def __init__(self):
super(MyPlugin, self).__init__()
self.updated = False
def sensor_base(self, blackboard):
pass
def communicator_base(self, planning):
if self.updated:
self.updated = False
self.debug("my_plugin_schedules", "Rescheduling at {}.".format(datetime.now().isoformat()))
for schedule in planning.schedules:
self.debug("my_plugin_schedules", schedule.to_string())
def planner_base(self, blackboard, planning: Planning):
"""
duration = 5
L = [4, 2, 2]
C = [{2: 3}, {1: 2}, {0: 4}, {0: 4}, {2: 3}]
where each item of C is an {pattern_id: weight}
duration of a stitch = 1 / sqrt(weight)
R = [ 0.57, 0.7, 0.5, 0.5, 0.57]
Sum of duration weights = 2.84
each stitch lasts for a fraction of the total length
comp = [{2: [3, 1.0]}, {1: [2, 1.23]}, {0: [4, 0.88]}, {0: [4, 0.88]}, {2: [3, 1.0]}]
"""
last_schedule: Schedule = planning.get_last_schedule()
if last_schedule:
self.debug("my_plugin_last_schedule","last_schedule end at {}".format(last_schedule.end_at.isoformat()))
start_at_base = last_schedule.end_at
if start_at_base > datetime.now():
self.debug("my_plugin_skip","skip")
return
else:
start_at_base = datetime.now()
duration = 10
L = gen(randint(12, 20), randint(3, 5))
C = struct(L)
R = []
Rsum = 0
Ret = []
for i in C:
R.append(1 / sqrt(i['weight']))
Rsum = sum(R)
for i, j in enumerate(C):
C[i]['duration'] = R[i] * duration / Rsum
# [{'id': 1, 'weight': 4, 'duration': 0.11670090144936807}, {'id': 0, 'weight': 1, 'duration': 0.23340180289873613}, {'id': 0, 'weight': 1, 'duration': 0.23340180289873613}, {'id': 1, 'weight': 4, 'duration': 0.11670090144936807}, {'id': 2, 'weight': 2, 'duration': 0.1650399975708623}, {'id': 3, 'weight': 3, 'duration': 0.1347545937329293}]
schedules = []
elapsed: float = 0
for item in C:
duration = item["duration"]
start_at = start_at_base + timedelta( seconds=elapsed)
schedules.append(Schedule(
start_at=start_at,
end_at=start_at + timedelta( seconds=duration),
name=item['id'],
content=item
))
elapsed += duration
planning.add(schedules)
self.updated = True