Trouble with Implementing Viterbi Algorithm for Robot Localization in Python

I’m working on a project to implement a Viterbi algorithm for robot localization in a grid map using sensor data. However, my implementation seems to produce incorrect results. I’m not sure where my mistake is. Below is my code:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>import numpy as np
def parse_input(file_path):
with open(file_path, 'r') as file:
rows, cols = map(int, file.readline().strip().split())
map_data = [file.readline().strip().split() for _ in range(rows)]
num_observations = int(file.readline().strip())
observations = [file.readline().strip() for _ in range(num_observations)]
error_rate = float(file.readline().strip())
return rows, cols, map_data, num_observations, observations, error_rate
def initialize_probabilities(rows, cols, map_data):
num_traversable = sum(row.count('0') for row in map_data)
initial_prob = 1 / num_traversable
probabilities = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
probabilities[(i, j)] = initial_prob
return probabilities
def get_neighbors(x, y, rows, cols, map_data):
neighbors = []
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nx, ny = x + dx, y + dy
if 0 <= nx < rows and 0 <= ny < cols and map_data[nx][ny] == '0':
neighbors.append((nx, ny))
return neighbors
def build_transition_matrix(rows, cols, map_data):
transitions = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
neighbors = get_neighbors(i, j, rows, cols, map_data)
transitions[(i, j)] = {neighbor: 1 / len(neighbors) for neighbor in neighbors}
return transitions
def sensor_reading_to_bits(sensor_reading):
directions = ['N', 'S', 'W', 'E']
return {directions[i]: int(sensor_reading[i]) for i in range(4)}
def calculate_emission_probability(sensor_bits, actual_bits, error_rate):
distance = sum(sensor_bits[d] != actual_bits[d] for d in sensor_bits)
return (1 - error_rate) ** (4 - distance) * error_rate ** distance
def build_emission_matrix(rows, cols, map_data, observations, error_rate):
emissions = {}
for obs in observations:
obs_bits = sensor_reading_to_bits(obs)
emission = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
actual_bits = {
'N': 1 if i > 0 and map_data[i-1][j] == 'X' else 0,
'S': 1 if i < rows-1 and map_data[i+1][j] == 'X' else 0,
'W': 1 if j > 0 and map_data[i][j-1] == 'X' else 0,
'E': 1 if j < cols-1 and map_data[i][j+1] == 'X' else 0,
}
emission[(i, j)] = calculate_emission_probability(obs_bits, actual_bits, error_rate)
emissions[obs] = emission
return emissions
def viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix):
num_steps = len(observations)
trellis = {pos: [0] * num_steps for pos in initial_probabilities.keys()}
# Initialization
for pos in initial_probabilities.keys():
trellis[pos][0] = initial_probabilities[pos] * emission_matrix[observations[0]][pos]
# Iteration
for t in range(1, num_steps):
for pos in initial_probabilities.keys():
max_prob = 0
for prev_pos in transition_matrix.keys():
if pos in transition_matrix[prev_pos]:
prob = trellis[prev_pos][t-1] * transition_matrix[prev_pos][pos] * emission_matrix[observations[t]][pos]
if prob > max_prob:
max_prob = prob
trellis[pos][t] = max_prob
return trellis
def save_output(trellis, file_path, rows, cols):
maps = [np.zeros((rows, cols)) for _ in range(len(next(iter(trellis.values()))))]
for pos, probs in trellis.items():
x, y = pos
for t, prob in enumerate(probs):
maps[t][x][y] = prob
np.savez(file_path, *maps)
# Main function to run the entire process
def main(input_file, output_file):
rows, cols, map_data, num_observations, observations, error_rate = parse_input(input_file)
initial_probabilities = initialize_probabilities(rows, cols, map_data)
transition_matrix = build_transition_matrix(rows, cols, map_data)
emission_matrix = build_emission_matrix(rows, cols, map_data, observations, error_rate)
trellis = viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix)
save_output(trellis, output_file, rows, cols)
# Example usage
if __name__ == "__main__":
import sys
input_file = sys.argv[1]
output_file = 'output.npz'
main(input_file, output_file)
</code>
<code>import numpy as np def parse_input(file_path): with open(file_path, 'r') as file: rows, cols = map(int, file.readline().strip().split()) map_data = [file.readline().strip().split() for _ in range(rows)] num_observations = int(file.readline().strip()) observations = [file.readline().strip() for _ in range(num_observations)] error_rate = float(file.readline().strip()) return rows, cols, map_data, num_observations, observations, error_rate def initialize_probabilities(rows, cols, map_data): num_traversable = sum(row.count('0') for row in map_data) initial_prob = 1 / num_traversable probabilities = {} for i in range(rows): for j in range(cols): if map_data[i][j] == '0': probabilities[(i, j)] = initial_prob return probabilities def get_neighbors(x, y, rows, cols, map_data): neighbors = [] for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]: nx, ny = x + dx, y + dy if 0 <= nx < rows and 0 <= ny < cols and map_data[nx][ny] == '0': neighbors.append((nx, ny)) return neighbors def build_transition_matrix(rows, cols, map_data): transitions = {} for i in range(rows): for j in range(cols): if map_data[i][j] == '0': neighbors = get_neighbors(i, j, rows, cols, map_data) transitions[(i, j)] = {neighbor: 1 / len(neighbors) for neighbor in neighbors} return transitions def sensor_reading_to_bits(sensor_reading): directions = ['N', 'S', 'W', 'E'] return {directions[i]: int(sensor_reading[i]) for i in range(4)} def calculate_emission_probability(sensor_bits, actual_bits, error_rate): distance = sum(sensor_bits[d] != actual_bits[d] for d in sensor_bits) return (1 - error_rate) ** (4 - distance) * error_rate ** distance def build_emission_matrix(rows, cols, map_data, observations, error_rate): emissions = {} for obs in observations: obs_bits = sensor_reading_to_bits(obs) emission = {} for i in range(rows): for j in range(cols): if map_data[i][j] == '0': actual_bits = { 'N': 1 if i > 0 and map_data[i-1][j] == 'X' else 0, 'S': 1 if i < rows-1 and map_data[i+1][j] == 'X' else 0, 'W': 1 if j > 0 and map_data[i][j-1] == 'X' else 0, 'E': 1 if j < cols-1 and map_data[i][j+1] == 'X' else 0, } emission[(i, j)] = calculate_emission_probability(obs_bits, actual_bits, error_rate) emissions[obs] = emission return emissions def viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix): num_steps = len(observations) trellis = {pos: [0] * num_steps for pos in initial_probabilities.keys()} # Initialization for pos in initial_probabilities.keys(): trellis[pos][0] = initial_probabilities[pos] * emission_matrix[observations[0]][pos] # Iteration for t in range(1, num_steps): for pos in initial_probabilities.keys(): max_prob = 0 for prev_pos in transition_matrix.keys(): if pos in transition_matrix[prev_pos]: prob = trellis[prev_pos][t-1] * transition_matrix[prev_pos][pos] * emission_matrix[observations[t]][pos] if prob > max_prob: max_prob = prob trellis[pos][t] = max_prob return trellis def save_output(trellis, file_path, rows, cols): maps = [np.zeros((rows, cols)) for _ in range(len(next(iter(trellis.values()))))] for pos, probs in trellis.items(): x, y = pos for t, prob in enumerate(probs): maps[t][x][y] = prob np.savez(file_path, *maps) # Main function to run the entire process def main(input_file, output_file): rows, cols, map_data, num_observations, observations, error_rate = parse_input(input_file) initial_probabilities = initialize_probabilities(rows, cols, map_data) transition_matrix = build_transition_matrix(rows, cols, map_data) emission_matrix = build_emission_matrix(rows, cols, map_data, observations, error_rate) trellis = viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix) save_output(trellis, output_file, rows, cols) # Example usage if __name__ == "__main__": import sys input_file = sys.argv[1] output_file = 'output.npz' main(input_file, output_file) </code>
import numpy as np

def parse_input(file_path):
    with open(file_path, 'r') as file:
        rows, cols = map(int, file.readline().strip().split())
        map_data = [file.readline().strip().split() for _ in range(rows)]
        num_observations = int(file.readline().strip())
        observations = [file.readline().strip() for _ in range(num_observations)]
        error_rate = float(file.readline().strip())
    return rows, cols, map_data, num_observations, observations, error_rate

def initialize_probabilities(rows, cols, map_data):
    num_traversable = sum(row.count('0') for row in map_data)
    initial_prob = 1 / num_traversable
    probabilities = {}
    for i in range(rows):
        for j in range(cols):
            if map_data[i][j] == '0':
                probabilities[(i, j)] = initial_prob
    return probabilities

def get_neighbors(x, y, rows, cols, map_data):
    neighbors = []
    for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
        nx, ny = x + dx, y + dy
        if 0 <= nx < rows and 0 <= ny < cols and map_data[nx][ny] == '0':
            neighbors.append((nx, ny))
    return neighbors

def build_transition_matrix(rows, cols, map_data):
    transitions = {}
    for i in range(rows):
        for j in range(cols):
            if map_data[i][j] == '0':
                neighbors = get_neighbors(i, j, rows, cols, map_data)
                transitions[(i, j)] = {neighbor: 1 / len(neighbors) for neighbor in neighbors}
    return transitions

def sensor_reading_to_bits(sensor_reading):
    directions = ['N', 'S', 'W', 'E']
    return {directions[i]: int(sensor_reading[i]) for i in range(4)}

def calculate_emission_probability(sensor_bits, actual_bits, error_rate):
    distance = sum(sensor_bits[d] != actual_bits[d] for d in sensor_bits)
    return (1 - error_rate) ** (4 - distance) * error_rate ** distance

def build_emission_matrix(rows, cols, map_data, observations, error_rate):
    emissions = {}
    for obs in observations:
        obs_bits = sensor_reading_to_bits(obs)
        emission = {}
        for i in range(rows):
            for j in range(cols):
                if map_data[i][j] == '0':
                    actual_bits = {
                        'N': 1 if i > 0 and map_data[i-1][j] == 'X' else 0,
                        'S': 1 if i < rows-1 and map_data[i+1][j] == 'X' else 0,
                        'W': 1 if j > 0 and map_data[i][j-1] == 'X' else 0,
                        'E': 1 if j < cols-1 and map_data[i][j+1] == 'X' else 0,
                    }
                    emission[(i, j)] = calculate_emission_probability(obs_bits, actual_bits, error_rate)
        emissions[obs] = emission
    return emissions

def viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix):
    num_steps = len(observations)
    trellis = {pos: [0] * num_steps for pos in initial_probabilities.keys()}
    
    # Initialization
    for pos in initial_probabilities.keys():
        trellis[pos][0] = initial_probabilities[pos] * emission_matrix[observations[0]][pos]
    
    # Iteration
    for t in range(1, num_steps):
        for pos in initial_probabilities.keys():
            max_prob = 0
            for prev_pos in transition_matrix.keys():
                if pos in transition_matrix[prev_pos]:
                    prob = trellis[prev_pos][t-1] * transition_matrix[prev_pos][pos] * emission_matrix[observations[t]][pos]
                    if prob > max_prob:
                        max_prob = prob
            trellis[pos][t] = max_prob
    
    return trellis

def save_output(trellis, file_path, rows, cols):
    maps = [np.zeros((rows, cols)) for _ in range(len(next(iter(trellis.values()))))]
    for pos, probs in trellis.items():
        x, y = pos
        for t, prob in enumerate(probs):
            maps[t][x][y] = prob
    np.savez(file_path, *maps)

# Main function to run the entire process
def main(input_file, output_file):
    rows, cols, map_data, num_observations, observations, error_rate = parse_input(input_file)
    initial_probabilities = initialize_probabilities(rows, cols, map_data)
    transition_matrix = build_transition_matrix(rows, cols, map_data)
    emission_matrix = build_emission_matrix(rows, cols, map_data, observations, error_rate)
    trellis = viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix)
    save_output(trellis, output_file, rows, cols)

# Example usage
if __name__ == "__main__":
    import sys
    input_file = sys.argv[1]
    output_file = 'output.npz'
    main(input_file, output_file)

The output probabilities seem off, and I’m not sure if the issue is with the initialization, transition, or emission probabilities, or perhaps the Viterbi algorithm itself. Any insights on where my implementation might be going wrong would be greatly appreciated!

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật