""" DarkWorldsMetricMountianOsteric.py
Custom evaluation metric for the 'Observing Dark Worlds' competition.
[Description of metric, or reference to documentation.]
Update: Made for the training set only so users can check there results from the training c
@Author: David Harvey
Created: 22 August 2012
"""
import numpy as np
import math as mt
import itertools as it
import csv as c
import getopt as gt
import sys as sys
import argparse as ap
import string as st
import random as rd
def calc_delta_r(x_predicted,y_predicted,x_true,y_true):
""" Compute the scalar distance between predicted halo centers
and the true halo centers. Predictions are matched to the closest
halo center.
Notes: It takes in the predicted and true positions, and then loops over each possible configuration and finds the most optimal one.
Arguments:
x_predicted, y_predicted: vector for predicted x- and y-positions (1 to 3 elements)
x_true, y_true: vector for known x- and y-positions (1 to 3 elements)
Returns:
radial_distance: vector containing the scalar distances between the predicted halo centres and the true halo centres (1 to 3 elements)
true_halo_indexes: vector containing indexes of the input true halos which matches the predicted halo indexes (1 to 3 elements)
measured_halo_indexes: vector containing indexes of the predicted halo position with the reference to the true halo position.
e.g if true_halo_indexes=[0,1] and measured_halo_indexes=[1,0] then the first x,y coordinates of the true halo position matches the second input of the predicted x,y coordinates.
"""
num_halos=len(x_true)
num_configurations=mt.factorial(num_halos)
configurations=np.zeros([num_halos,num_configurations],int)
distances = np.zeros([num_configurations],float)
radial_distance=[]
a=['01','012']
count=0
true_halo_indexes=[]
predicted_halo_indexes=[]
distances_perm=np.zeros([num_configurations,num_halos],float)
true_halo_indexes_perm=[]
predicted_halo_indexes_perm=[]
for perm in it.permutations(a[num_halos-2],num_halos):
which_true_halos=[]
which_predicted_halos=[]
for j in range(num_halos):
distances_perm[count,j]=np.sqrt((x_true[j]-x_predicted[int(perm[j])])**2\
+(y_true[j]-y_predicted[int(perm[j])])**2)
which_true_halos.append(j)
which_predicted_halos.append(int(perm[j]))
true_halo_indexes_perm.append(which_true_halos)
predicted_halo_indexes_perm.append(which_predicted_halos)
distances[count]=sum(distances_perm[count,0::])
count=count+1
config = np.where(distances == min(distances))[0][0]
radial_distance.append(distances_perm[config,0::])
true_halo_indexes=true_halo_indexes_perm[config]
predicted_halo_indexes=predicted_halo_indexes_perm[config]
return radial_distance,true_halo_indexes,predicted_halo_indexes
def calc_theta(x_predicted, y_predicted, x_true, y_true, x_ref, y_ref):
""" Calculate the angle the predicted position and the true position, where the zero degree corresponds to the line joing the true halo position and the reference point given.
Arguments:
x_predicted, y_predicted: vector for predicted x- and y-positions (1 to 3 elements)
x_true, y_true: vector for known x- and y-positions (1 to 3 elements)
Note that the input of these are matched up so that the first elements of each
vector are associated with one another
x_ref, y_ref: scalars of the x,y coordinate of reference point
Returns:
Theta: A vector containing the angles of the predicted halo w.r.t the true halo
with the vector joining the reference point and the halo as the zero line.
"""
num_halos=len(x_predicted)
theta=np.zeros([num_halos+1],float)
phi = np.zeros([num_halos],float)
psi = np.arctan( (y_true-y_ref)/(x_true-x_ref) )
phi[x_true != x_ref] = np.arctan((y_predicted[x_true != x_predicted]-\
y_true[x_true != x_predicted])\
/(x_predicted[x_true != x_predicted]-\
x_true[x_true != x_predicted]))
phi =convert_to_360(phi, x_predicted-x_true,\
y_predicted-y_true)
psi = convert_to_360(psi, x_true-x_ref,\
y_true-y_ref)
theta = phi-psi
theta[theta< 0.0]=theta[theta< 0.0]+2.0*mt.pi
return theta
def convert_to_360(angle, x_in, y_in):
""" Convert the given angle to the true angle in the range 0:2pi
Arguments:
angle:
x_in, y_in: the x and y coordinates used to determine the quartile
the coordinate lies in so to add of pi or 2pi
Returns:
theta: the angle in the range 0:2pi
"""
n = len(x_in)
for i in range(n):
if x_in[i] < 0 and y_in[i] > 0:
angle[i] = angle[i]+mt.pi
elif x_in[i] < 0 and y_in[i] < 0:
angle[i] = angle[i]+mt.pi
elif x_in[i] > 0 and y_in[i] < 0:
angle[i] = angle[i]+2.0*mt.pi
elif x_in[i] == 0 and y_in[i] == 0:
angle[i] = 0
elif x_in[i] == 0 and y_in[i] > 0:
angle[i] = mt.pi/2.
elif x_in[i] < 0 and y_in[i] == 0:
angle[i] = mt.pi
elif x_in[i] == 0 and y_in[i] < 0:
angle[i] = 3.*mt.pi/2.
return angle
def get_ref(x_halo,y_halo,weight):
""" Gets the reference point of the system of halos by weighted averaging the x and y
coordinates.
Arguments:
x_halo, y_halo: Vector num_halos referring to the coordinates of the halos
weight: the weight which will be assigned to the position of the halo
num_halos: number of halos in the system
Returns:
x_ref, y_ref: The coordinates of the reference point for the metric
"""
x_ref = np.sum([x_halo*weight])/np.sum([weight])
y_ref = np.sum([y_halo*weight])/np.sum([weight])
return x_ref,y_ref
def main_score( nhalo_all, x_true_all, y_true_all, x_ref_all, y_ref_all, sky_prediction):
"""abstracts the score from the old command-line interface.
sky_prediction is a dx2 array of predicted x,y positions
-camdp"""
r=np.array([],dtype=float)
angle=np.array([],dtype=float)
num_halos_total=0
for selectskyinsolutions, sky in enumerate(sky_prediction):
nhalo=int(nhalo_all[selectskyinsolutions])
x_true=x_true_all[selectskyinsolutions][0:nhalo]
y_true=y_true_all[selectskyinsolutions][0:nhalo]
x_predicted=np.array([],dtype=float)
y_predicted=np.array([],dtype=float)
for i in range(nhalo):
x_predicted=np.append(x_predicted,float(sky[0]))
y_predicted=np.append(y_predicted,float(sky[1]))
x_ref=x_ref_all[selectskyinsolutions]
y_ref=y_ref_all[selectskyinsolutions]
num_halos_total=num_halos_total+nhalo
if nhalo == 1:
r=np.append(r,np.sqrt( (x_predicted-x_true)**2 \
+ (y_predicted-y_true)**2))
if (x_predicted-x_true) != 0:
psi = np.arctan((y_predicted-y_true)/(x_predicted-x_true))
else: psi=0.
theta = convert_to_360([psi], [x_predicted-x_true], [y_predicted-y_true])
angle=np.append(angle,theta)
else:
r_index_index = calc_delta_r(x_predicted, y_predicted, x_true, \
y_true)
r=np.append(r,r_index_index[0][0])
halo_index= r_index_index[1]
predicted_index=r_index_index[2]
angle=np.append(angle,calc_theta\
(x_predicted[predicted_index],\
y_predicted[predicted_index],\
x_true[halo_index],\
y_true[halo_index],x_ref,\
y_ref))
av_r=sum(r)/len(r)
N = float(num_halos_total)
angle_vec = np.sqrt(( 1.0/N * sum(np.cos(angle)) )**2 + \
( 1.0/N * sum(np.sin(angle)) )**2)
W1=1./1000.
W2=1.
metric = W1*av_r + W2*angle_vec
print('Your average distance in pixels you are away from the true halo is', av_r)
print('Your average angular vector is', angle_vec)
print('Your score for the training data is', metric)
return metric
def main(user_fname, fname):
""" Script to compute the evaluation metric for the Observing Dark Worlds competition. You can run it on your training data to understand how well you have done with the training data.
"""
r=np.array([],dtype=float)
angle=np.array([],dtype=float)
true_sky_id=[]
sky_loader = c.reader(open(fname, 'rb'))
for row in sky_loader:
true_sky_id.append(row[0])
nhalo_all=np.loadtxt(fname,usecols=(1,),delimiter=',',skiprows=1)
x_true_all=np.loadtxt(fname,usecols=(4,6,8),delimiter=',',skiprows=1)
y_true_all=np.loadtxt(fname,usecols=(5,7,9),delimiter=',',skiprows=1)
x_ref_all=np.loadtxt(fname,usecols=(2,),delimiter=',',skiprows=1)
y_ref_all=np.loadtxt(fname,usecols=(3,),delimiter=',',skiprows=1)
for row in sky_loader:
true_sky_id.append(row[1])
num_halos_total=0
sky_prediction = c.reader(open(user_fname, 'rb'))
try:
with open(user_fname, 'r') as f:
header = float((f.readline()).split(',')[1])
print('THE INPUT FILE DOES NOT APPEAR TO HAVE A HEADER')
except :
print('THE INPUT FILE APPEARS TO HAVE A HEADER, SKIPPING THE FIRST LINE')
skip_header = sky_prediction.next()
for sky in sky_prediction:
sky_id = str(sky[0])
does_it_exist=true_sky_id.count(sky_id)
if does_it_exist > 0:
selectskyinsolutions=true_sky_id.index(sky_id)-1
else:
print('Sky_id does not exist, formatting problem: ',sky_id)
sys.exit(2)
nhalo=int(nhalo_all[selectskyinsolutions])
x_true=x_true_all[selectskyinsolutions][0:nhalo]
y_true=y_true_all[selectskyinsolutions][0:nhalo]
x_predicted=np.array([],dtype=float)
y_predicted=np.array([],dtype=float)
for i in range(nhalo):
x_predicted=np.append(x_predicted,float(sky[2*i+1]))
y_predicted=np.append(y_predicted,float(sky[2*i+2]))
x_ref=x_ref_all[selectskyinsolutions]
y_ref=y_ref_all[selectskyinsolutions]
num_halos_total=num_halos_total+nhalo
if nhalo == 1:
r=np.append(r,np.sqrt( (x_predicted-x_true)**2 \
+ (y_predicted-y_true)**2))
if (x_predicted-x_true) != 0:
psi = np.arctan((y_predicted-y_true)/(x_predicted-x_true))
else: psi=0.
theta = convert_to_360([psi], [x_predicted-x_true], [y_predicted-y_true])
angle=np.append(angle,theta)
else:
r_index_index = calc_delta_r(x_predicted, y_predicted, x_true, \
y_true)
r=np.append(r,r_index_index[0][0])
halo_index= r_index_index[1]
predicted_index=r_index_index[2]
angle=np.append(angle,calc_theta\
(x_predicted[predicted_index],\
y_predicted[predicted_index],\
x_true[halo_index],\
y_true[halo_index],x_ref,\
y_ref))
av_r=sum(r)/len(r)
N = float(num_halos_total)
angle_vec = np.sqrt(( 1.0/N * sum(np.cos(angle)) )**2 + \
( 1.0/N * sum(np.sin(angle)) )**2)
W1=1./1000.
W2=1.
metric = W1*av_r + W2*angle_vec
print('Your average distance in pixels you are away from the true halo is', av_r)
print('Your average angular vector is', angle_vec)
print('Your score for the training data is', metric)
if __name__ == "__main__":
parser = ap.ArgumentParser(description='Work out the Metric for your input file')
parser.add_argument('inputfile',type=str,nargs=1,help='Input file of halo positions. Needs to be in the format SkyId,halo_x1,haloy1,halox_2,halo_y2,halox3,halo_y3 ')
parser.add_argument('reffile',type=str,nargs=1,help='This should point to Training_halos.csv')
args = parser.parse_args()
user_fname=args.inputfile[0]
filename = (args.reffile[0]).count('Training_halos.csv')
if filename == 0:
fname=args.reffile[0]+str('Training_halos.csv')
else:
fname=args.reffile[0]
main(user_fname, fname)