I’m working on a project to implement a Viterbi algorithm for robot localization in a grid map using sensor data. However, my implementation seems to produce incorrect results. I’m not sure where my mistake is. Below is my code:
<code>import numpy as np
def parse_input(file_path):
with open(file_path, 'r') as file:
rows, cols = map(int, file.readline().strip().split())
map_data = [file.readline().strip().split() for _ in range(rows)]
num_observations = int(file.readline().strip())
observations = [file.readline().strip() for _ in range(num_observations)]
error_rate = float(file.readline().strip())
return rows, cols, map_data, num_observations, observations, error_rate
def initialize_probabilities(rows, cols, map_data):
num_traversable = sum(row.count('0') for row in map_data)
initial_prob = 1 / num_traversable
probabilities = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
probabilities[(i, j)] = initial_prob
return probabilities
def get_neighbors(x, y, rows, cols, map_data):
neighbors = []
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nx, ny = x + dx, y + dy
if 0 <= nx < rows and 0 <= ny < cols and map_data[nx][ny] == '0':
neighbors.append((nx, ny))
return neighbors
def build_transition_matrix(rows, cols, map_data):
transitions = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
neighbors = get_neighbors(i, j, rows, cols, map_data)
transitions[(i, j)] = {neighbor: 1 / len(neighbors) for neighbor in neighbors}
return transitions
def sensor_reading_to_bits(sensor_reading):
directions = ['N', 'S', 'W', 'E']
return {directions[i]: int(sensor_reading[i]) for i in range(4)}
def calculate_emission_probability(sensor_bits, actual_bits, error_rate):
distance = sum(sensor_bits[d] != actual_bits[d] for d in sensor_bits)
return (1 - error_rate) ** (4 - distance) * error_rate ** distance
def build_emission_matrix(rows, cols, map_data, observations, error_rate):
emissions = {}
for obs in observations:
obs_bits = sensor_reading_to_bits(obs)
emission = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
actual_bits = {
'N': 1 if i > 0 and map_data[i-1][j] == 'X' else 0,
'S': 1 if i < rows-1 and map_data[i+1][j] == 'X' else 0,
'W': 1 if j > 0 and map_data[i][j-1] == 'X' else 0,
'E': 1 if j < cols-1 and map_data[i][j+1] == 'X' else 0,
}
emission[(i, j)] = calculate_emission_probability(obs_bits, actual_bits, error_rate)
emissions[obs] = emission
return emissions
def viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix):
num_steps = len(observations)
trellis = {pos: [0] * num_steps for pos in initial_probabilities.keys()}
# Initialization
for pos in initial_probabilities.keys():
trellis[pos][0] = initial_probabilities[pos] * emission_matrix[observations[0]][pos]
# Iteration
for t in range(1, num_steps):
for pos in initial_probabilities.keys():
max_prob = 0
for prev_pos in transition_matrix.keys():
if pos in transition_matrix[prev_pos]:
prob = trellis[prev_pos][t-1] * transition_matrix[prev_pos][pos] * emission_matrix[observations[t]][pos]
if prob > max_prob:
max_prob = prob
trellis[pos][t] = max_prob
return trellis
def save_output(trellis, file_path, rows, cols):
maps = [np.zeros((rows, cols)) for _ in range(len(next(iter(trellis.values()))))]
for pos, probs in trellis.items():
x, y = pos
for t, prob in enumerate(probs):
maps[t][x][y] = prob
np.savez(file_path, *maps)
# Main function to run the entire process
def main(input_file, output_file):
rows, cols, map_data, num_observations, observations, error_rate = parse_input(input_file)
initial_probabilities = initialize_probabilities(rows, cols, map_data)
transition_matrix = build_transition_matrix(rows, cols, map_data)
emission_matrix = build_emission_matrix(rows, cols, map_data, observations, error_rate)
trellis = viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix)
save_output(trellis, output_file, rows, cols)
# Example usage
if __name__ == "__main__":
import sys
input_file = sys.argv[1]
output_file = 'output.npz'
main(input_file, output_file)
</code>
<code>import numpy as np
def parse_input(file_path):
with open(file_path, 'r') as file:
rows, cols = map(int, file.readline().strip().split())
map_data = [file.readline().strip().split() for _ in range(rows)]
num_observations = int(file.readline().strip())
observations = [file.readline().strip() for _ in range(num_observations)]
error_rate = float(file.readline().strip())
return rows, cols, map_data, num_observations, observations, error_rate
def initialize_probabilities(rows, cols, map_data):
num_traversable = sum(row.count('0') for row in map_data)
initial_prob = 1 / num_traversable
probabilities = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
probabilities[(i, j)] = initial_prob
return probabilities
def get_neighbors(x, y, rows, cols, map_data):
neighbors = []
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nx, ny = x + dx, y + dy
if 0 <= nx < rows and 0 <= ny < cols and map_data[nx][ny] == '0':
neighbors.append((nx, ny))
return neighbors
def build_transition_matrix(rows, cols, map_data):
transitions = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
neighbors = get_neighbors(i, j, rows, cols, map_data)
transitions[(i, j)] = {neighbor: 1 / len(neighbors) for neighbor in neighbors}
return transitions
def sensor_reading_to_bits(sensor_reading):
directions = ['N', 'S', 'W', 'E']
return {directions[i]: int(sensor_reading[i]) for i in range(4)}
def calculate_emission_probability(sensor_bits, actual_bits, error_rate):
distance = sum(sensor_bits[d] != actual_bits[d] for d in sensor_bits)
return (1 - error_rate) ** (4 - distance) * error_rate ** distance
def build_emission_matrix(rows, cols, map_data, observations, error_rate):
emissions = {}
for obs in observations:
obs_bits = sensor_reading_to_bits(obs)
emission = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
actual_bits = {
'N': 1 if i > 0 and map_data[i-1][j] == 'X' else 0,
'S': 1 if i < rows-1 and map_data[i+1][j] == 'X' else 0,
'W': 1 if j > 0 and map_data[i][j-1] == 'X' else 0,
'E': 1 if j < cols-1 and map_data[i][j+1] == 'X' else 0,
}
emission[(i, j)] = calculate_emission_probability(obs_bits, actual_bits, error_rate)
emissions[obs] = emission
return emissions
def viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix):
num_steps = len(observations)
trellis = {pos: [0] * num_steps for pos in initial_probabilities.keys()}
# Initialization
for pos in initial_probabilities.keys():
trellis[pos][0] = initial_probabilities[pos] * emission_matrix[observations[0]][pos]
# Iteration
for t in range(1, num_steps):
for pos in initial_probabilities.keys():
max_prob = 0
for prev_pos in transition_matrix.keys():
if pos in transition_matrix[prev_pos]:
prob = trellis[prev_pos][t-1] * transition_matrix[prev_pos][pos] * emission_matrix[observations[t]][pos]
if prob > max_prob:
max_prob = prob
trellis[pos][t] = max_prob
return trellis
def save_output(trellis, file_path, rows, cols):
maps = [np.zeros((rows, cols)) for _ in range(len(next(iter(trellis.values()))))]
for pos, probs in trellis.items():
x, y = pos
for t, prob in enumerate(probs):
maps[t][x][y] = prob
np.savez(file_path, *maps)
# Main function to run the entire process
def main(input_file, output_file):
rows, cols, map_data, num_observations, observations, error_rate = parse_input(input_file)
initial_probabilities = initialize_probabilities(rows, cols, map_data)
transition_matrix = build_transition_matrix(rows, cols, map_data)
emission_matrix = build_emission_matrix(rows, cols, map_data, observations, error_rate)
trellis = viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix)
save_output(trellis, output_file, rows, cols)
# Example usage
if __name__ == "__main__":
import sys
input_file = sys.argv[1]
output_file = 'output.npz'
main(input_file, output_file)
</code>
import numpy as np
def parse_input(file_path):
with open(file_path, 'r') as file:
rows, cols = map(int, file.readline().strip().split())
map_data = [file.readline().strip().split() for _ in range(rows)]
num_observations = int(file.readline().strip())
observations = [file.readline().strip() for _ in range(num_observations)]
error_rate = float(file.readline().strip())
return rows, cols, map_data, num_observations, observations, error_rate
def initialize_probabilities(rows, cols, map_data):
num_traversable = sum(row.count('0') for row in map_data)
initial_prob = 1 / num_traversable
probabilities = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
probabilities[(i, j)] = initial_prob
return probabilities
def get_neighbors(x, y, rows, cols, map_data):
neighbors = []
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nx, ny = x + dx, y + dy
if 0 <= nx < rows and 0 <= ny < cols and map_data[nx][ny] == '0':
neighbors.append((nx, ny))
return neighbors
def build_transition_matrix(rows, cols, map_data):
transitions = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
neighbors = get_neighbors(i, j, rows, cols, map_data)
transitions[(i, j)] = {neighbor: 1 / len(neighbors) for neighbor in neighbors}
return transitions
def sensor_reading_to_bits(sensor_reading):
directions = ['N', 'S', 'W', 'E']
return {directions[i]: int(sensor_reading[i]) for i in range(4)}
def calculate_emission_probability(sensor_bits, actual_bits, error_rate):
distance = sum(sensor_bits[d] != actual_bits[d] for d in sensor_bits)
return (1 - error_rate) ** (4 - distance) * error_rate ** distance
def build_emission_matrix(rows, cols, map_data, observations, error_rate):
emissions = {}
for obs in observations:
obs_bits = sensor_reading_to_bits(obs)
emission = {}
for i in range(rows):
for j in range(cols):
if map_data[i][j] == '0':
actual_bits = {
'N': 1 if i > 0 and map_data[i-1][j] == 'X' else 0,
'S': 1 if i < rows-1 and map_data[i+1][j] == 'X' else 0,
'W': 1 if j > 0 and map_data[i][j-1] == 'X' else 0,
'E': 1 if j < cols-1 and map_data[i][j+1] == 'X' else 0,
}
emission[(i, j)] = calculate_emission_probability(obs_bits, actual_bits, error_rate)
emissions[obs] = emission
return emissions
def viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix):
num_steps = len(observations)
trellis = {pos: [0] * num_steps for pos in initial_probabilities.keys()}
# Initialization
for pos in initial_probabilities.keys():
trellis[pos][0] = initial_probabilities[pos] * emission_matrix[observations[0]][pos]
# Iteration
for t in range(1, num_steps):
for pos in initial_probabilities.keys():
max_prob = 0
for prev_pos in transition_matrix.keys():
if pos in transition_matrix[prev_pos]:
prob = trellis[prev_pos][t-1] * transition_matrix[prev_pos][pos] * emission_matrix[observations[t]][pos]
if prob > max_prob:
max_prob = prob
trellis[pos][t] = max_prob
return trellis
def save_output(trellis, file_path, rows, cols):
maps = [np.zeros((rows, cols)) for _ in range(len(next(iter(trellis.values()))))]
for pos, probs in trellis.items():
x, y = pos
for t, prob in enumerate(probs):
maps[t][x][y] = prob
np.savez(file_path, *maps)
# Main function to run the entire process
def main(input_file, output_file):
rows, cols, map_data, num_observations, observations, error_rate = parse_input(input_file)
initial_probabilities = initialize_probabilities(rows, cols, map_data)
transition_matrix = build_transition_matrix(rows, cols, map_data)
emission_matrix = build_emission_matrix(rows, cols, map_data, observations, error_rate)
trellis = viterbi(rows, cols, map_data, observations, initial_probabilities, transition_matrix, emission_matrix)
save_output(trellis, output_file, rows, cols)
# Example usage
if __name__ == "__main__":
import sys
input_file = sys.argv[1]
output_file = 'output.npz'
main(input_file, output_file)
The output probabilities seem off, and I’m not sure if the issue is with the initialization, transition, or emission probabilities, or perhaps the Viterbi algorithm itself. Any insights on where my implementation might be going wrong would be greatly appreciated!