from debuggable import Debuggable from random import randint, choice from math import sqrt from datetime import datetime, timedelta from planning import Planning from schedule import Schedule def gen(N, n): """ returns a list of integer numbers between 1 and n with exhaustion level N """ res = [] i = 0 while N > 1: r = randint(1, n) N /= r res.append(r) i += 1 return res def abba(lst, item): return [item, *lst, item] def abab(lst, item): return [item, *lst, item, *lst] def baa(lst, item): return [*lst, item, item] def ab(lst, item): return [item, *lst] def ba(lst, item): return [*lst, item] def struct(lst): fnList = ['abba', 'abab', 'baa', 'ab', 'ba'] ret = [] for i, w in enumerate(lst): fn_name = choice(fnList) fn = globals().copy().get(fn_name) ret = fn(ret, {'id': i, 'weight': w}) return ret class MyPlugin(Debuggable): """Planner for general slots allocations""" name = "myPlugin" def __init__(self): super(MyPlugin, self).__init__() self.updated = False def sensor_base(self, blackboard): pass def communicator_base(self, planning): if self.updated: self.updated = False self.debug("my_plugin_schedules", "Rescheduling at {}.".format(datetime.now().isoformat())) for schedule in planning.schedules: self.debug("my_plugin_schedules", schedule.to_string()) def planner_base(self, blackboard, planning: Planning): """ duration = 5 L = [4, 2, 2] C = [{2: 3}, {1: 2}, {0: 4}, {0: 4}, {2: 3}] where each item of C is an {pattern_id: weight} duration of a stitch = 1 / sqrt(weight) R = [ 0.57, 0.7, 0.5, 0.5, 0.57] Sum of duration weights = 2.84 each stitch lasts for a fraction of the total length comp = [{2: [3, 1.0]}, {1: [2, 1.23]}, {0: [4, 0.88]}, {0: [4, 0.88]}, {2: [3, 1.0]}] """ last_schedule: Schedule = planning.get_last_schedule() if last_schedule: self.debug("my_plugin_last_schedule","last_schedule end at {}".format(last_schedule.end_at.isoformat())) start_at_base = last_schedule.end_at if start_at_base > datetime.now(): self.debug("my_plugin_skip","skip") return else: start_at_base = datetime.now() duration = 10 L = gen(randint(12, 20), randint(3, 5)) C = struct(L) R = [] Rsum = 0 Ret = [] for i in C: R.append(1 / sqrt(i['weight'])) Rsum = sum(R) for i, j in enumerate(C): C[i]['duration'] = R[i] * duration / Rsum # [{'id': 1, 'weight': 4, 'duration': 0.11670090144936807}, {'id': 0, 'weight': 1, 'duration': 0.23340180289873613}, {'id': 0, 'weight': 1, 'duration': 0.23340180289873613}, {'id': 1, 'weight': 4, 'duration': 0.11670090144936807}, {'id': 2, 'weight': 2, 'duration': 0.1650399975708623}, {'id': 3, 'weight': 3, 'duration': 0.1347545937329293}] schedules = [] elapsed: float = 0 for item in C: duration = item["duration"] start_at = start_at_base + timedelta( seconds=elapsed) schedules.append(Schedule( start_at=start_at, end_at=start_at + timedelta( seconds=duration), name=item['id'], content=item )) elapsed += duration planning.add(schedules) self.updated = True