Question

I am currently trying to write a simple multi-threading program using Python. However I have run on to a bug I think I am missing. I am trying to simply write a program that uses a brute force approach the problem below:

How the knight must move...

As can be seen from the image there is a chess board where the knight travels all respective squares.

My approach is simply try each possible way where each possible way is a new thread. If in the end of the thread there is no possible moves count how many squares has been visited if it is equal to 63 write solution on a simple text file...

The code is as below:

from thread import start_new_thread
import sys

i=1

coor_x = raw_input("Please enter x[0-7]: ")
coor_y = raw_input("Please enter y[0-7]: ")

coordinate = int(coor_x), int(coor_y)



def checker(coordinates, previous_moves):

    possible_moves = [(coordinates[0]+1, coordinates[1]+2), (coordinates[0]+1, coordinates[1]-2),
                      (coordinates[0]-1, coordinates[1]+2), (coordinates[0]-1, coordinates[1]-2),
                      (coordinates[0]+2, coordinates[1]+1), (coordinates[0]+2, coordinates[1]-1),
                      (coordinates[0]-2, coordinates[1]+1), (coordinates[0]-2, coordinates[1]-1)]

    to_be_removed = []

    for index in possible_moves:
        (index_x, index_y) = index
        if index_x < 0 or index_x > 7 or index_y < 0 or index_y > 7:
            to_be_removed.append(index)

    for index in previous_moves:
        if index in possible_moves:
            to_be_removed.append(index)



    if not to_be_removed:
        for index in to_be_removed:
            possible_moves.remove(index)


    if len(possible_moves) == 0:
        if not end_checker(previous_moves):
            print "This solution is not correct"
    else:
        return possible_moves

def end_checker(previous_moves):
    if len(previous_moves) == 63:
        writer = open("knightstour.txt", "w")
        writer.write(previous_moves)
        writer.close()
        return True
    else:
        return False


def runner(previous_moves, coordinates, i):
    if not end_checker(previous_moves):
        process_que = checker(coordinates, previous_moves)
        for processing in process_que:
            previous_moves.append(processing)
            i = i+1
            print "Thread number:"+str(i)
            start_new_thread(runner, (previous_moves, processing, i))
    else:
        sys.exit()



previous_move = []
previous_move.append(coordinate)

runner(previous_move, coordinate, i)
c = raw_input("Type something to exit !")

I am open to all suggestions... My sample output is as below:

Please enter x[0-7]: 4
Please enter y[0-7]: 0
Thread number:2
Thread number:3
Thread number:4
Thread number:5Thread number:4
Thread number:5

Thread number:6Thread number:3Thread number:6Thread number:5Thread number:6
Thread number:7
Thread number:6Thread number:8

Thread number:7

Thread number:8Thread number:7
 Thread number:8



Thread number:4
Thread number:5
Thread number:6Thread number:9Thread number:7Thread number:9
Thread number:10
Thread number:11
Thread number:7
Thread number:8
Thread number:9
Thread number:10
Thread number:11
Thread number:12
Thread number:5Thread number:5
 Thread number:6
Thread number:7
Thread number:8
Thread number:9

Thread number:6
Thread number:7
Thread number:8
Thread number:9

If seems for some reason the number of threads are stuck at 12... Any help would be most welcomed...

Thank you

Was it helpful?

Solution 8

I am the intern of John Roach and he gave me this as a homework, i was unable to solve it. I used his account to ask the question. The following is my answer; I found its solution by using a heuristic known as Warnsdorff's rule. But the code I found online has an output like this:

boardsize: 5
Start position: c3

19,12,17, 6,21
 2, 7,20,11,16
13,18, 1,22, 5
 8, 3,24,15,10
25,14, 9, 4,23

Because of that i changed it a little bit, instead of using the standard output, i used P because P's format is tuple. I created a list of tuples named moves and return it.

def knights_tour(start, boardsize=boardsize, _debug=False):
    board = {(x,y):0 for x in range(boardsize) for y in range(boardsize)}
    move = 1
    P = chess2index(start, boardsize)
    moves.append(P)

    board[P] = move
    move += 1
    if _debug:
        print(boardstring(board, boardsize=boardsize))
    while move <= len(board):
        P = min(accessibility(board, P, boardsize))[1]
        moves.append(P)
        board[P] = move
        move += 1
        if _debug:
            print(boardstring(board, boardsize=boardsize))
            input('\n%2i next: ' % move)
    return moves

Now that i had the list of moves i wrote the following program to create a GIF that animated those moves. The code is below;

import sys
import pygame
import knightmove
import os


pygame.init()

square_list = []
line_list = []
i = 0
j = 1


def make_gif():
    os.system("convert   -delay 40   -loop 0   Screenshots/screenshot*.png   knights_tour.gif")

def get_moves(start_move):
    return knightmove.knights_tour(start_move, 8)

def scratch(move):
    move_x, move_y = move
    x = int(move_x) * 50
    y = int(move_y) * 50
    line_list.append([x+25, y+25])
    square_list.append([x, y])
    for index in range(len(square_list)):
        screen.blit(square, square_list[index])

def draw_line():
    for index in range(len(line_list)-1):
        pygame.draw.line(screen, black, (line_list[index]), (line_list[index+1]), 2)

def draw_dot():
    return pygame.draw.circle(screen, red, (line_list[i]), 3, 0)

def screenshot():
    if j <= 9:
        c = "0"+str(j)
    else:
        c = j
    pygame.image.save(screen, "/home/renko/PycharmProjects/pygame_tut1/Screenshots/screenshot"+str(c)+".png")


size = width, height = 400, 400
white = 255, 255, 255
black = 0, 0, 0, 0
red = 255, 0, 0

screen = pygame.display.set_mode(size)
square = pygame.image.load("Untitled.png")

start = raw_input("Enter the start position:")
moves = get_moves(start)


while 1:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            sys.exit()
    screen.fill(white)
    pygame.draw.line(screen, black, (0, 50), (400, 50), 3)
    pygame.draw.line(screen, black, (0, 100), (400, 100), 3)
    pygame.draw.line(screen, black, (0, 150), (400, 150), 3)
    pygame.draw.line(screen, black, (0, 200), (400, 200), 3)
    pygame.draw.line(screen, black, (0, 250), (400, 250), 3)
    pygame.draw.line(screen, black, (0, 300), (400, 300), 3)
    pygame.draw.line(screen, black, (0, 350), (400, 350), 3)

    pygame.draw.line(screen, black, (50, 0), (50, 400), 3)
    pygame.draw.line(screen, black, (100, 0), (100, 400), 3)
    pygame.draw.line(screen, black, (150, 0), (150, 400), 3)
    pygame.draw.line(screen, black, (200, 0), (200, 400), 3)
    pygame.draw.line(screen, black, (250, 0), (250, 400), 3)
    pygame.draw.line(screen, black, (300, 0), (300, 400), 3)
    pygame.draw.line(screen, black, (350, 0), (350, 400), 3)



    scratch(moves[i])
    draw_line()
    draw_dot()
    screenshot()
    i += 1
    j += 1
    pygame.display.flip()
    if i == 64:
        make_gif()
        print "GIF was created"
        break

Just so you know the imported knight move library is the library that I created using the rosettacode.org algorithm.

And yes... I was sent on a snipe hunt... :(

OTHER TIPS

Your so-called Quest of the Knights Who Say Ni problem, while a clever rephrasing for asking a Python question, is more widely known as the Knights Tour mathematical problem. Given that and the fact you're a math teacher, I suspect your question's likely a fool's errand (aka snipe hunt) and that you're fully aware of the following fact:

According to a section of Wikipedia's article on the Knights Tour problem:

5.1 Brute force algorithms

A brute-force search for a knight's tour is impractical on all but the
smallest boards; for example, on an 8x8 board there are approximately
4x1051 possible move sequences, and it is well beyond the capacity
of modern computers (or networks of computers) to perform operations
on such a large set.

Exactly 3,926,356,053,343,005,839,641,342,729,308,535,057,127,083,875,101,072 of them according to a footnote link.

There are several issues with your current code.

The first issue I see is that your checker never determines any potential move to be invalid. You have a bug in the conditional in this block:

if not to_be_removed:
    for index in to_be_removed:
        possible_moves.remove(index)

The loop only runs the loop if to_be_removed is empty. Since looping over an empty list terminates immediately, it does nothing. I think you want the if to be if to_be_removed, which tests for a list that has something in it. But that test is not necessary. You can just run the loop always and let it do nothing if the list is empty.

Or better yet, don't use a to_be_removed list at all and directly filter your possible_moves with a list comprehension:

def checker(coordinates, previous_moves):
    possible_moves = [(coordinates[0]+1, coordinates[1]+2),
                      (coordinates[0]+1, coordinates[1]-2),
                      (coordinates[0]-1, coordinates[1]+2),
                      (coordinates[0]-1, coordinates[1]-2),
                      (coordinates[0]+2, coordinates[1]+1),
                      (coordinates[0]+2, coordinates[1]-1),
                      (coordinates[0]-2, coordinates[1]+1),
                      (coordinates[0]-2, coordinates[1]-1)]

    valid_moves = [(x, y) for x, y in possible_moves
                   if 0 <= x <= 7 and 0 <= y <=7 and (x,y) not in previous_moves]

    return valid_moves # always return the list, even if it is empty

The second issue I see is regarding your previously_seen list. Your threads are all using references to the same list instance, and as they mutate it (by calling append in runner), they're going to mess up its value for each other. That is, after the first thread runs and launches its eight child threads, they'll each see the same situation, with all eight spots already visited. You can work around this perhaps by copying the list, rather than mutating it (e.g. previously_seen + [processing]).

The third issue is that your thread numbering system won't work the way you want it to. The reason is that each thread numbers its eight child threads with the values immediately following its own number. So thread 1 spawns threads 2-9, but thread 2 spawns threads 3-10, reusing a bunch of numbers.

There are various ways you could come up with better numbers, but they're not entirely trivial. You could use a global variable that you increment each time you launch a new thread, but that will require a lock for synchronization to make sure two threads don't both try to increment it simultaneously. Or you could use some sort of mathematical scheme to make the child thread numbers unique (e.g the child threads of thread i are i*8 plus a number from 0-8), but this would probably require skipping some thread numbers, since you can't know in advance which threads will not be needed due to invalid moves.

A fourth issue is that your output code will only let you see the last result in your data file even if many solutions are found. This is because you open the output file with the "w" mode, which erases the file's previous contents. You probably want to use "a" (append) or "r+" (read and write, without truncating).

My final suggestion isn't a specific error in the code, but more a general point. Using threads in this program doesn't seem to gain you anything. Threaded Python code can never run concurrently even if you have multiple cores in your CPU due to the Global Interpreter Lock. Threading can make sense for IO limited code, but for CPU limited code like yours, its going to add overhead and debugging difficulty for no possible gains.

A more basic solution that simply recurses within a single thread, or which uses some other strategy like backtracking to examine all of the search space will almost certainly be better.

there are two issues that I can see here:

1) You are counting your threads using the variable i. But i is passed to all child threads from the calling thread. So the first thread will pass 1,2,3 to the first 3 child threads. But the child thread labelled 1 wil then pass 2,3,4 to its 3 children (the grandchild threads of teh original thread). Or in other words you are duplicating the thread numbers in different threads which is one reason why you are not counting over 12. You can get around this in a couple of ways - the easiest is probably to use a variable declared outside the scope of the runner function and use a lock to make sure that two threads don't modify it at the same time:

runnerLock = threading.Lock()
i=0
def runner(previous_moves, coordinates):
global i
if not end_checker(previous_moves):
    process_que = checker(coordinates, previous_moves)
    for processing in process_que:
        previous_moves.append(processing)
        runnerLock.acquire()
        i = i+1
        print "Thread number:"+str(i)
        runnerLock.release()
        start_new_thread(runner, (previous_moves, processing))
else:
    sys.exit()

2) The second issue is that in teh runner function you are doing:

    previous_moves.append(processing)

in a for loop where you are looking to kick off a new thread for each of the possible moves at the current location. The problem with this is that if you have 4 possible moves you would like to kick off threads for the first will have the current previous moves plus a new appended (which is what you want). However the second will have the previous moves + the first new move you kicked off a thread for + the second new move you kicked off a thread for. So its history of previous moves is now corrupted (it has the first possibility the other thread is trying as well as the one it is meant to be trying). The third is has 2 extra possibilities and so on. This can be corrected by doing (untested):

runnerLock = threading.Lock()
i=0
def runner(previous_moves, coordinates):
global i
if not end_checker(previous_moves):
    process_que = checker(coordinates, previous_moves)
    temp_previous_moves = previous_moves.deepcopy()
    for processing in process_que:
        temp_previous_moves.append(processing)
        runnerLock.acquire()
        i = i+1
        print "Thread number:"+str(i)
        runnerLock.release()
        start_new_thread(runner, (temp_previous_moves, processing))
else:
    sys.exit()

This way of doing it also avoids the need for a lock for the previous_moves array (which was being modified in all your different threads simultaneously as you were doing it)

I tried to do a very similar thing (exploring a large combinatorial search tree) in Python using MultiProcessing. I implemented some kind of work stealing algorithm You can find the result of my experiment which are aiming to go into Sagemath at this patch. However, I finally realized that Python is a very bad language to do that. I strongly suggest trying the Cilk++ langage which is a superset of C++. It particularly fits those kind of problems. You can for example find a solution of the 8-queens problem. I'm sorry it is only a link answer but I lost a lot of time trying to do that in Python before realizing that this is not the right way.

Note that writing to a file is not thread safe.

import thread
import sys

i=1

coor_x = raw_input("Please enter x[0-7]: ")
coor_y = raw_input("Please enter y[0-7]: ")

coordinate = int(coor_x), int(coor_y)



def checker(coordinates, previous_moves):

    possible_moves = [(coordinates[0]+1, coordinates[1]+2), (coordinates[0]+1, coordinates[1]-2), 
                      (coordinates[0]-1, coordinates[1]+2), (coordinates[0]-1, coordinates[1]-2),    
                      (coordinates[0]+2, coordinates[1]+1), (coordinates[0]+2, coordinates[1]-1),    
                      (coordinates[0]-2, coordinates[1]+1), (coordinates[0]-2, coordinates[1]-1)] 

    possible_moves = [(x,y) for x,y in possible_moves if x >= 0 and x < 8 and y >=0 and y < 8]

    possible_moves = [move for move in possible_moves if move not in previous_moves]

    if len(possible_moves) == 0:
        if not end_checker(previous_moves):
            print "This solution is not correct"
    else:
        return possible_moves

def end_checker(previous_moves):
    if len(previous_moves) == 63:
        writer = open("knightstour.txt", "w")
        writer.write(str(previous_moves) + "\n")
        writer.close()
        return True
    else:
        return False


def runner(previous_moves, coordinates, i):
    if not end_checker(previous_moves):
        process_que = checker(coordinates, previous_moves)
        if not process_que:
            thread.exit()
        for processing in process_que:
            previous_moves.append(processing)
            i = i+1
            print "Thread number:"+str(i)
            thread.start_new_thread(runner, (previous_moves, processing, i))
    else:
        sys.exit()



previous_move = []
previous_move.append(coordinate)

runner(previous_move, coordinate, i)
c = raw_input("Type something to exit !")

Here is a solution I've come up with cause I found this interesting (didn't solve it within a minute... so may be a bit off somewhere... Used Depth-First search, but can easily be changed):

#!/usr/bin/env python
# you should probably be using threading... python docs suggest thread is 
from threading import Thread
import itertools
import time


def is_legal(move):
    x = move[0]
    y = move[1]
    return 8 > x >= 0 and 8 > y >= 0


def get_moves(current, previous_moves):
    possibilities = []
    possibilities.extend(itertools.product([1,-1],[2,-2]))
    possibilities.extend(itertools.product([2, -2],[1,-1]))
    for mx, my in possibilities:
        move_dest = [current[0] + mx, current[1] + my]
        if is_legal(move_dest) and not move_dest in previous_moves:
            yield (move_dest)


def solve_problem(current, previous_moves):
    # add location to previous moves...
    previous_moves.append(current)
    threads = []
    for move in get_moves(current, previous_moves):
        # start a thread for every legal move
        t = Thread(target=solve_problem, args=(list(move), list(previous_moves)))
        threads.extend([t])
        t.start()
        # dfs prevent resource overflow...
        # - makes threads redundant as mentioned in comments
        # t.join()
    if len(previous_moves) % 20 == 0:
        #   print "got up to %d moves !\n" % len(previous_moves)
        pass

    if len(previous_moves) == 64:
        print " solved !\n" % len(previous_moves)
        # check to see if we're done
        t = int(time.time())
        with open('res_%d' % t, 'w') as f:
            f.write("solution: %r" % previous_moves)
            f.close()

#    for t in threads:
#        t.join()


if "__main__" == __name__:
    print "starting..."
    coor_x = int(raw_input("Please enter x[0-7]:"))
    coor_y = int(raw_input("Please enter y[0-7]:"))
    start = [coor_x, coor_y]
    print "using start co-ordinations: %r" % start
    solve_problem(start, [])

Threadinv vs Thread

From a quick re-inspection of your code maybe try and make sure things are actually copied to your sub-threads. Python defaults to share memory from what I've read.

Please have a look at this code which solves a particular type of the Knight Sequence Tour Problem. It doesn't use a multi-threaded approach but it is highly-optimised (algorithmically) to simulate the Knight Sequence Tour Problem, if it is speed you are looking for (and it doesn't use threads). I'm sure you can adapt it for your use case. Just modify the build_keypad function to match the chess board topology and remove the vowel constraints code. Hope it helps.

__author__ = 'me'
'''
Created on Jun 5, 2012

@author: Iftikhar Khan
'''
REQD_SEQUENCE_LENGTH = 10
VOWEL_LIMIT = 2
VOWELS = [(0, 0), (4, 0), (3, -1), (4, -2)]


def build_keypad():
    """Generates 2-D mesh representation of keypad."""
    keypad = [(x, y) for x in range(5) for y in range(-3, 1)]
    # adjust topology
    keypad.remove((0, -3))
    keypad.remove((4, -3))
    return keypad


def check_position(position):
    """Determines if the transform is valid. That is, not off-keypad."""
    if position == (0, -3) or position == (4, -3):
        return False

    if (-1 < position[0] < 5) and (-4 < position[1] < 1):
        return True
    else:
        return False


def build_map(keypad):
    """Generates a map of all possible Knight moves for all keys."""
    moves = [(1, -2), (1, 2), (2, -1), (2, 1), (-1, -2), (-1, 2), (-2, -1), (-2, 1)]
    keymap = {}
    for key_pos in keypad:
        for move in moves:
            x = key_pos[0] + move[0]
            y = key_pos[1] + move[1]
            if check_position((x, y)):
                keymap.setdefault(key_pos, []).append((x, y))
    return keymap


def build_sequence(k, p, m, v, ks):
    """Generates n-key sequence permutations under m-vowel constraints using
        memoization optimisation technique. A valid permutation is a function
        of a key press, position of a key in a sequence and the current
        vowel count. This memoization data is stored as a 3-tuple, (k,p,v), in
        dictionary m.
    """
    if k in VOWELS:
        v += 1
        if v > VOWEL_LIMIT:
            v -= 1
            return 0

    if p == REQD_SEQUENCE_LENGTH:
        m[(k, p, v)] = 0
        return 1
    else:
        if (k, p, v) in m:
            return m[(k, p, v)]
        else:
            m[(k, p, v)] = 0
            for e in ks[k]:
                m[(k, p, v)] += build_sequence(e, p + 1, m, v, ks)

    return m[(k, p, v)]


def count(keys):
    """Counts all n-key permutations under m-vowel constraints."""
    # initialise counters
    sequence_position = 1
    vowel_count = 0
    memo = {}

    return sum(build_sequence(key, sequence_position, memo, vowel_count, keys)
               for key in keys)


if __name__ == '__main__':
    print(count(build_map(build_keypad())))
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top