Алгоритм перестановки рабочего / временного интервала / фильтрации ограничений

StackOverflow https://stackoverflow.com/questions/1554366

Вопрос

Надеюсь, вы сможете помочь мне с этими ребятами.Это не помощь по работе - это для благотворительной организации очень трудолюбивых волонтеров, которым действительно не помешала бы менее запутанная / раздражающая система расписания, чем та, что у них есть в настоящее время.

Если кто-нибудь знает о хорошем стороннем приложении, которое (безусловно) автоматизировало бы это, это было бы почти так же хорошо.Просто...пожалуйста, не предлагайте случайные расписания, например, для бронирования аудиторий, поскольку я не думаю, что они могут это сделать.

Заранее спасибо за прочтение;Я знаю, что это важный пост.Однако я пытаюсь сделать все возможное, чтобы четко задокументировать это и показать, что я приложил усилия самостоятельно.

Проблема

Мне нужен алгоритм планирования рабочего / временного интервала, который генерирует смены для рабочих, который соответствует следующим критериям:

Входные данные

import datetime.datetime as dt

class DateRange:
    def __init__(self, start, end):
        self.start = start
        self.end   = end

class Shift:
    def __init__(self, range, min, max):
        self.range = range
        self.min_workers = min
        self.max_workers = max

tue_9th_10pm = dt(2009, 1, 9,   22, 0)
wed_10th_4am = dt(2009, 1, 10,   4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)

shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shift_2_times = Range(wed_10th_4am, wed_10th_10am)
shift_3_times = Range(wed_10th_10am, wed_10th_2pm)

shift_1 = Shift(shift_1_times, 2,3)  # allows 3, requires 2, but only 2 available
shift_2 = Shift(shift_2_times, 2,2)  # allows 2
shift_3 = Shift(shift_3_times, 2,3)  # allows 3, requires 2, 3 available

shifts = ( shift_1, shift_2, shift_3 )

joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]

joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)

workers = ( joe, bob, sam, ned, max, amy, jim )

Обработка

Сверху, сдвиги и рабочие являются ли две основные входные переменные для обработки

В каждой смене требуется минимальное и максимальное количество работников.Выполнение минимальных требований для смены имеет решающее значение для успеха, но если все остальное терпит неудачу, расписание с пробелами, которые нужно заполнить вручную, лучше, чем "ошибка" :) Основная алгоритмическая проблема заключается в том, что не должно быть ненужных пробелов, когда доступно достаточное количество работников.

В идеале максимальное количество работников за смену должно быть заполнено, но это самый низкий приоритет по сравнению с другими ограничениями, поэтому, если что-то и должно дать результат, то это должно быть это.

Гибкие ограничения

Они немного гибки, и их границы можно немного раздвинуть, если не удается найти "идеальное" решение.Однако эта гибкость должна быть последним средством, а не использоваться случайным образом.В идеале гибкость должна настраиваться с помощью переменной "fudge_factor" или аналогичной.

  • Существует минимальный период времени между двумя сменами.Итак, работник не должен быть расписан на две смены например, в один и тот же день.
  • Существует максимальное количество смен, которое работник может выполнить за определенный период времени (скажем, месяц)
  • Существует максимальное количество определенных смен, которые можно выполнить за месяц (скажем, ночные смены)

Приятно иметь, но не обязательно

Если вы сможете придумать алгоритм, который выполняет вышеперечисленное и включает в себя любой / все из них, я буду серьезно впечатлен и благодарен.Даже дополнительный скрипт для выполнения этих фрагментов по отдельности тоже был бы отличным.

  • Перекрывающиеся смены.Например, было бы неплохо иметь возможность указать смену "на стойке регистрации" и "в бэк-офисе" смена, которая происходит в одно и то же время.Это можно было бы сделать с помощью отдельных вызовов программы с разными данными смены, за исключением того, что ограничения, связанные с планированием людей на несколько смен за заданное время период был бы пропущен.

  • Минимальный период времени переноса для работников, который можно указать для каждого работника (а не для всего мира).Например, если Джо чувствует себя перегруженным работой или имеет дело с личными проблемами, или он новичок, осваивающий азы, мы могли бы назначить ему расписание реже, чем другим работникам.

  • Некоторый автоматизированный / случайный / справедливый способ отбора персонала для заполнения минимального количества мест количество смен, когда нет подходящих работников.

  • Какой-то способ справиться с внезапными отменами и просто заполнить пробелы без изменения других смен.

Выходной тест

Вероятно, алгоритм должен генерировать как можно больше совпадающих Решений, где каждое решение выглядит следующим образом:

class Solution:
    def __init__(self, shifts_workers):
        """shifts_workers -- a dictionary of shift objects as keys, and a
        a lists of workers filling the shift as values."""

        assert isinstance(dict, shifts_workers)
        self.shifts_workers = shifts_workers

Вот тестовая функция для отдельного решения, учитывая приведенные выше данные.Я подумай это верно, но я был бы признателен и за некоторую экспертную оценку по этому поводу.

def check_solution(solution):
  assert isinstance(Solution, solution)

  def shift_check(shift, workers, workers_allowed):
      assert isinstance(Shift, shift):
      assert isinstance(list, workers):
      assert isinstance(list, workers_allowed)

      num_workers = len(workers)
      assert num_workers >= shift.min_workers
      assert num_workers <= shift.max_workers

      for w in workers_allowed:
          assert w in workers

  shifts_workers = solution.shifts_workers

  # all shifts should be covered
  assert len(shifts_workers.keys()) == 3
  assert shift1 in shifts_workers.keys()
  assert shift2 in shifts_workers.keys()
  assert shift3 in shifts_workers.keys()

  # shift_1 should be covered by 2 people - joe, and bob
  shift_check(shift_1, shifts_workers[shift_1], (joe, bob))

  # shift_2 should be covered by 2 people - sam and amy
  shift_check(shift_2, shifts_workers[shift_2], (sam, amy))

  # shift_3 should be covered by 3 people - ned, max, and jim
  shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))

Попытки

Я пытался реализовать это с помощью генетического алгоритма, но, похоже, не могу настроить его совершенно правильно, поэтому, хотя базовый принцип, кажется, работает в одну смену, он не может решить даже простые случаи с несколькими сменами и несколькими работниками.

Моя последняя попытка состоит в том, чтобы сгенерировать все возможные перестановки в качестве решения, а затем сократить перестановки, которые не соответствуют ограничениям.Кажется, это работает намного быстрее и продвинуло меня дальше, но я использую itertools.product() от python 2.6, чтобы помочь сгенерировать перестановки, и я не совсем могу сделать это правильно.Меня бы не удивило, если бы там было много ошибок, так как, честно говоря, проблема не очень хорошо укладывается в моей голове :)

В настоящее время мой код для этого находится в двух файлах:models.py и rota.py.models.py выглядит как:

# -*- coding: utf-8 -*-
class Shift:
    def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
        self.start = start_datetime
        self.end = end_datetime
        self.duration = self.end - self.start
        self.min_coverage = min_coverage
        self.max_coverage = max_coverage

    def __repr__(self):
        return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)

class Duty:
    def __init__(self, worker, shift, slot):
        self.worker = worker
        self.shift = shift
        self.slot = slot

    def __repr__(self):
        return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
        self.worker.dump(indent=indent, depth=depth+1)
        print ind + ">"

class Avail:
    def __init__(self, start_time, end_time):
        self.start = start_time
        self.end = end_time

    def __repr__(self):
        return "<%s to %s>" % (self.start, self.end)

class Worker:
    def __init__(self, name, availabilities):
        self.name = name
        self.availabilities = availabilities

    def __repr__(self):
        return "<Worker %s Avail=%r>" % (self.name, self.availabilities)

    def dump(self,  indent=4,  depth=1):
        ind = " " * (indent * depth)
        print ind + "<Worker %s" % self.name
        for avail in self.availabilities:
            print ind + " " * indent + repr(avail)
        print ind + ">"

    def available_for_shift(self,  shift):
        for a in self.availabilities:
            if shift.start >= a.start and shift.end <= a.end:
                return True
        print "Worker %s not available for %r (Availability: %r)" % (self.name,  shift, self.availabilities)
        return False

class Solution:
    def __init__(self, shifts):
        self._shifts = list(shifts)

    def __repr__(self):
        return "<Solution: shifts=%r>" % self._shifts

    def duties(self):
        d = []
        for s in self._shifts:
            for x in s:
                yield x

    def shifts(self):
        return list(set([ d.shift for d in self.duties() ]))

    def dump_shift(self, s, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<ShiftList"
        for duty in s:
            duty.dump(indent=indent, depth=depth+1)
        print ind + ">"

    def dump(self, indent=4, depth=1):
        ind = " " * (indent * depth)
        print ind + "<Solution"
        for s in self._shifts:
            self.dump_shift(s, indent=indent, depth=depth+1)
        print ind + ">"

class Env:
    def __init__(self, shifts, workers):
        self.shifts = shifts
        self.workers = workers
        self.fittest = None
        self.generation = 0

class DisplayContext:
    def __init__(self,  env):
        self.env = env

    def status(self, msg, *args):
        raise NotImplementedError()

    def cleanup(self):
        pass

    def update(self):
        pass

и rota.py похоже,:

#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-

from datetime import datetime as dt
am2 = dt(2009,  10,  1,  2, 0)
am8 = dt(2009,  10,  1,  8, 0)
pm12 = dt(2009,  10,  1,  12, 0)

def duties_for_all_workers(shifts, workers):
    from models import Duty

    duties = []

    # for all shifts
    for shift in shifts:
        # for all slots
        for cov in range(shift.min_coverage, shift.max_coverage):
            for slot in range(cov):
                # for all workers
                for worker in workers:
                    # generate a duty
                    duty = Duty(worker, shift, slot+1)
                    duties.append(duty)

    return duties

def filter_duties_for_shift(duties,  shift):
    matching_duties = [ d for d in duties if d.shift == shift ]
    for m in matching_duties:
        yield m

def duty_permutations(shifts,  duties):
    from itertools import product

    # build a list of shifts
    shift_perms = []
    for shift in shifts:
        shift_duty_perms = []
        for slot in range(shift.max_coverage):
            slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
            shift_duty_perms.append(slot_duties)
        shift_perms.append(shift_duty_perms)

    all_perms = ( shift_perms,  shift_duty_perms )

    # generate all possible duties for all shifts
    perms = list(product(*shift_perms))
    return perms

def solutions_for_duty_permutations(permutations):
    from models import Solution
    res = []
    for duties in permutations:
        sol = Solution(duties)
        res.append(sol)
    return res

def find_clashing_duties(duty, duties):
    """Find duties for the same worker that are too close together"""
    from datetime import timedelta
    one_day = timedelta(days=1)
    one_day_before = duty.shift.start - one_day
    one_day_after = duty.shift.end + one_day
    for d in [ ds for ds in duties if ds.worker == duty.worker ]:
        # skip the duty we're considering, as it can't clash with itself
        if duty == d:
            continue

        clashes = False

        # check if dates are too close to another shift
        if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
            clashes = True

        # check if slots collide with another shift
        if d.slot == duty.slot:
            clashes = True

        if clashes:
            yield d

def filter_unwanted_shifts(solutions):
    from models import Solution

    print "possibly unwanted:",  solutions
    new_solutions = []
    new_duties = []

    for sol in solutions:
        for duty in sol.duties():
            duty_ok = True

            if not duty.worker.available_for_shift(duty.shift):
                duty_ok = False

            if duty_ok:
                print "duty OK:"
                duty.dump(depth=1)
                new_duties.append(duty)
            else:
                print "duty **NOT** OK:"
                duty.dump(depth=1)

        shifts = set([ d.shift for d in new_duties ])
        shift_lists = []
        for s in shifts:
            shift_duties = [ d for d in new_duties if d.shift == s ]
            shift_lists.append(shift_duties)

        new_solutions.append(Solution(shift_lists))

    return new_solutions

def filter_clashing_duties(solutions):
    new_solutions = []

    for sol in solutions:
        solution_ok = True

        for duty in sol.duties():
            num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))

            # check if many duties collide with this one (and thus we should delete this one
            if num_clashing_duties > 0:
                solution_ok = False
                break

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_incomplete_shifts(solutions):
    new_solutions = []

    shift_duty_count = {}

    for sol in solutions:
        solution_ok = True

        for shift in set([ duty.shift for duty in sol.duties() ]):
            shift_duties = [ d for d in sol.duties() if d.shift == shift ]
            num_workers = len(set([ d.worker for d in shift_duties ]))

            if num_workers < shift.min_coverage:
                solution_ok = False

        if solution_ok:
            new_solutions.append(sol)

    return new_solutions

def filter_solutions(solutions,  workers):
    # filter permutations ############################
    # for each solution
    solutions = filter_unwanted_shifts(solutions)
    solutions = filter_clashing_duties(solutions)
    solutions = filter_incomplete_shifts(solutions)
    return solutions

def prioritise_solutions(solutions):
    # TODO: not implemented!
    return solutions

    # prioritise solutions ############################
    # for all solutions
        # score according to number of staff on a duty
        # score according to male/female staff
        # score according to skill/background diversity
        # score according to when staff last on shift

    # sort all solutions by score

def solve_duties(shifts, duties,  workers):
    # ramify all possible duties #########################
    perms = duty_permutations(shifts,  duties)
    solutions = solutions_for_duty_permutations(perms)
    solutions = filter_solutions(solutions,  workers)
    solutions = prioritise_solutions(solutions)
    return solutions

def load_shifts():
    from models import Shift
    shifts = [
        Shift(am2, am8, 2, 3),
        Shift(am8, pm12, 2, 3),
    ]
    return shifts

def load_workers():
    from models import Avail, Worker
    joe_avail = ( Avail(am2,  am8), )
    sam_avail = ( Avail(am2,  am8), )
    ned_avail = ( Avail(am2,  am8), )
    bob_avail = ( Avail(am8,  pm12), )
    max_avail = ( Avail(am8,  pm12), )
    joe = Worker("joe", joe_avail)
    sam = Worker("sam", sam_avail)
    ned = Worker("ned", sam_avail)
    bob = Worker("bob", bob_avail)
    max = Worker("max", max_avail)
    return (joe, sam, ned, bob, max)

def main():
    import sys

    shifts = load_shifts()
    workers = load_workers()
    duties = duties_for_all_workers(shifts, workers)
    solutions = solve_duties(shifts, duties, workers)

    if len(solutions) == 0:
        print "Sorry, can't solve this.  Perhaps you need more staff available, or"
        print "simpler duty constraints?"
        sys.exit(20)
    else:
        print "Solved.  Solutions found:"
        for sol in solutions:
            sol.dump()

if __name__ == "__main__":
    main()

Обрезая отладочный вывод перед результатом, в настоящее время это дает:

Solved.  Solutions found:
    <Solution
        <ShiftList
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker joe
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker sam
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                <Worker ned
                    <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                >
            >
        >
        <ShiftList
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker bob
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
            <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                <Worker max
                    <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                >
            >
        >
    >
Это было полезно?

Решение

Хорошо, я не знаю о конкретном алгоритме, но вот что я бы принял во внимание.

Оценка

Каким бы ни был метод, вам понадобится функция, чтобы оценить, насколько ваше решение удовлетворяет ограничениям.Вы можете использовать подход "сравнения" (без глобальной оценки, но способ сравнения двух решений), но я бы рекомендовал оценку.

Что было бы действительно хорошо, так это если бы вы могли получать оценку за более короткий промежуток времени, например ежедневно, это действительно полезно для алгоритмов, если вы можете "предсказать" диапазон итоговой оценки по частичному решению (например, только за первые 3 дня из 7).Таким образом, вы можете прервать вычисление, основанное на этом частичном решении, если оно уже слишком низкое, чтобы соответствовать вашим ожиданиям.

Симметрия

Вполне вероятно, что среди этих 200 человек у вас есть похожие профили:т.е. люди, разделяющие одни и те же характеристики (доступность, опыт, желание, ...).Если вы возьмете двух человек с одинаковым профилем, они будут взаимозаменяемы:

  • Решение 1:(смена 1:Джо)(смена 2:Джим) ... только для других работников...
  • Решение 2:(смена 1:Джим)(смена 2:Джо) ... только для других работников...

на самом деле это одно и то же решение с вашей точки зрения.

Хорошо то, что обычно у вас меньше профилей, чем у persons, что значительно сокращает время, затрачиваемое на вычисления!

Например, представьте, что вы сгенерировали все решения на основе решения 1, тогда нет необходимости что-либо вычислять на основе решения 2.

Итеративный

Вместо того чтобы генерировать все расписание сразу, вы можете рассмотреть возможность создания его постепенно (скажем, по 1 неделе за раз).Чистый выигрыш заключается в том, что сложность на неделю снижается (появляется меньше возможностей).

Затем, как только у вас будет эта неделя, вы вычисляете вторую, стараясь, конечно, учитывать первую с учетом ваших ограничений.

Преимущество заключается в том, что вы явно разрабатываете свой алгоритм с учетом уже используемого решения, таким образом, при следующем формировании расписания он не заставит человека работать 24 часа подряд!

Сериализация

Вам следует рассмотреть возможность сериализации объектов вашего решения (выбирайте по своему усмотрению, pickle довольно хорош для Python).Вам понадобится предыдущее расписание при создании нового, и я готов поспорить, что вы предпочли бы не вводить его вручную для 200 человек.

Исчерпывающий

Теперь, после всего этого, я бы на самом деле предпочел исчерпывающий поиск, поскольку при использовании симметрии и оценки возможностей может быть не так уж много (проблема остается NP-полной, хотя серебряной пули нет).

Возможно, вы захотите попробовать свои силы в Алгоритм обратного отслеживания.

Кроме того, вам следует взглянуть на следующие ссылки, которые касаются подобных проблем:

В обоих обсуждаются проблемы, возникшие во время внедрения, поэтому их устранение должно вам помочь.

Другие советы

Я пытался реализовать это с помощью генетического алгоритма, но, похоже, не могу настроить его совершенно правильно, поэтому, хотя основной принцип, похоже, работает в отдельные смены, это не может решить даже простые дела с несколькими сменами и несколькими работниками.

Короче говоря, не надо!Если у вас нет большого опыта работы с генетическими алгоритмами, вы не получите этого правильно.

  • Это приблизительные методы, которые не гарантируют сходимости к работоспособному решению.
  • Они работают только в том случае, если вы можете достаточно хорошо установить качество вашего текущего решения (т.е.количество критериев, которые не соблюдены).
  • Их качество критически зависит от качества операторов, которые вы используете для объединения / видоизменения предыдущих решений в новые.

Сложно разобраться в небольшой программе на python, если у вас почти нулевой опыт работы с GA.Если у вас небольшая группа людей, исчерпывающий поиск - не такой уж плохой вариант.Проблема в том, что это может работать правильно для n люди, будут медленными для n+1 людей и будет невыносимо медленным для n+2 и очень может быть, что ваш n в конечном итоге будет всего лишь 10.

Вы работаете над NP-полной задачей, и простых выигрышных решений не существует.Если выбранная вами задача планирования fancy schedule работает недостаточно хорошо, маловероятно, что у вас будет что-то лучшее с вашим скриптом python.

Если вы настаиваете на том, чтобы делать это с помощью своего собственного кода, это намного легче получить некоторые результаты с помощью минимального-максимального или имитированного отжига.

У меня нет выбора алгоритма, но я могу изложить некоторые практические соображения.

Поскольку алгоритм имеет дело с отменами, он должен запускаться всякий раз, когда возникает исключение из расписания, чтобы перепланировать всех.

Учтите, что некоторые алгоритмы не очень линейны и могут радикально перепланировать всех с этого момента.Вы, вероятно, хотите избежать этого, людям нравится знать свое расписание заранее.

Вы можете справиться с некоторыми отменами без повторного запуска алгоритма, потому что он может заранее назначить следующего доступного сотрудника или двух.

Возможно, не всегда возможно или желательно генерировать наиболее оптимальное решение, но вы можете вести постоянное количество событий "менее оптимальных" для каждого работника и всегда выбирать работника с наименьшим количеством, когда вам приходится назначать другой "плохой выбор".Это то, что обычно волнует людей (несколько "плохих" решений о планировании часто / несправедливо).

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top