SharedPriprema DataSeta / utility.pyOpen in CoCalc
import numpy as np
import sys
import inspect
import os
import pandas
import matplotlib.pyplot as plt
from sklearn.svm import SVR
from sklearn import model_selection, tree, neighbors, ensemble, utils
from sklearn.metrics import mean_squared_error
from sklearn.kernel_ridge import KernelRidge

def error_mean_square_distance(x, y):
    return mean_squared_error(x, y)

def test_regression(x_train, y_train, x_test, y_test, model_fit=None):
    """Check regression for various parameter settings."""
    mod_pred = None
    rng = utils.check_random_state(0)
    msg_info = """type: %s;
        mse: %f;
        Parameters: %s with a score of %0.2f;"""
    y_pred = None
    error_pred = None
    c = [0.1]#np.linspace(0.1, 2000.0, num=3)
    gamma = np.linspace(0.0, 5.0, num=3)
    grid = model_selection.ParameterGrid({"max_samples": [0.5, 1.0],
                                          "max_features": [0.5, 1.0],
                                          "bootstrap": [True, False],
                                          "bootstrap_features": [True, False]})
    if model_fit == "SVR":
        grid_svr = [{'kernel': ['linear'], 'C': c},
                      {'kernel': ['rbf'], 'C': c, 'gamma': gamma},
                      {'kernel': ['sigmoid'], 'C': c, 'gamma': gamma}]
        base_estimator = model_selection.GridSearchCV(SVR(), cv=5, param_grid=grid_svr)
    elif model_fit == "KNR":
        grid_knr = [{'weights': ['uniform', 'distance'], 'algorithm': ['auto'], 'p': [2]}]
        base_estimator = model_selection.GridSearchCV(neighbors.KNeighborsRegressor(), param_grid=grid_knr)
    elif model_fit == "DTR":
        base_estimator = tree.DecisionTreeRegressor()
    elif model_fit == "KRR":
        grid_krr = [{'kernel': ['linear'], 'alpha': 1/(2.*c)},
                    {'kernel': ['rbf'], 'alpha': 1/(2.*c), 'gamma': gamma},
                    {'kernel': ['sigmoid'], 'alpha': 1/(2.*c), "gamma": gamma}]
        base_estimator = model_selection.GridSearchCV(KernelRidge(), cv=5, param_grid=grid_krr)
        base_estimator = None
    for params in grid:
            mod_pred_temp = ensemble.BaggingRegressor(base_estimator=base_estimator, random_state=rng, **params)
  , y_train)
            y_pred_temp = mod_pred_temp.predict(x_test)
            error_pred_temp = error_mean_square_distance(y_test, y_pred_temp)
            if error_pred and error_pred > error_pred_temp:
                error_pred = error_pred_temp
                y_pred = y_pred_temp
                mod_pred = mod_pred_temp
            if error_pred is None:
                error_pred = error_pred_temp
                y_pred = y_pred_temp
                mod_pred = mod_pred_temp
    if mod_pred is not None:
        print(msg_info % (model_fit, error_pred, mod_pred.get_params(), mod_pred.score(x_test, y_test)))
    return y_pred

def get_file_list_from_root_dir(root_dir, extension=".xlsx"):
    file_list = []
    for file in os.listdir(root_dir):
        if file.endswith(extension):
            file_list.append(os.path.join(root_dir, file))
    return file_list

def collect_data_from_file_list(file_list, sheet_name):
    data = {}
    for path in file_list:
        base_name_without_ext = os.path.basename(path)
        base_name_without_ext = os.path.splitext(base_name_without_ext)[0]
            data[base_name_without_ext] = pandas.read_excel(open(path, 'rb'), index_col=0,
        except Exception as e:
            print(e, base_name_without_ext)
    return data

def extract_vectors_of_specific_data(data, st_name_t_stamps, mark):
    vecs = []
    st_name_t_stamps_len = len(st_name_t_stamps)
    for stock_name, t_stamp in st_name_t_stamps:
        if st_name_t_stamps_len != 1:
            vecs.append(extract_vectors_of_specific_data_1d(data[stock_name], t_stamp, mark))
            vecs = extract_vectors_of_specific_data_1d(data[stock_name], t_stamp, mark)
    return vecs

def extract_vectors_of_specific_data_1d(data, st_name_t_stamps, mark):
    vec = []
    for t_stamp in st_name_t_stamps:
    return vec

def plot_data(x, y, title="", xlabel="", ylabel=""):
    plt.figure(figsize=(20, 5))
    plt.plot(x, y)

def period_error(period, length):
    if length < period or period < 0:
            caller_name = inspect.currentframe().f_back.f_code.co_name
            error_massage = '\n'.join((
                "Function name: " + caller_name,
                "Length of data is: %d" % length,
                "Period is: %d" % period,
                "We need period to be grather then 0 and less then length of data"
            raise ValueError(error_massage)
        except Exception as error:
            print("Error occured: " + str(error) + "\nEXIT")

def relative_strength_index(close_prices_vecs, period):
    Relative Strength Index (RSI): A technical momentum indicator that compares
    the magnitude of recent gains to recent losses in an attempt to determine
    overbought and oversold conditions of an asset. The formula for computing
    the Relative Strength Index is as follows.
    :return: [RSI = 100-[100/(1+RS)]]
    where  RS = Avg. of x days’ up closes / Average of x days’ down closes.
    rsi = []
    for prices_vec in close_prices_vecs:
        if np.shape(prices_vec):
            rsi.append(relative_strength_index_1d(prices_vec, period))
            rsi = relative_strength_index_1d(close_prices_vecs, period)
    return rsi

def relative_strength_index_1d(close_prices_vec, period):
    len_cl_prices_vec = len(close_prices_vec)
    period_error(period, len_cl_prices_vec)
    deltas = np.diff(close_prices_vec)
    seed = deltas[:period]
    up = seed[seed >= 0].sum() / period
    down = -seed[seed < 0].sum() / period
    rsi = [0]*(len_cl_prices_vec - period)
    for i in range(0, len_cl_prices_vec - period):
        delta = deltas[i + period - 1]
        up = (up * (period - 1) + (delta if delta >= 0 else 0)) / period
        down = (down * (period - 1) + (-delta if delta < 0 else 0)) / period
        rs = up / down if down != 0 else 100
        rsi[i] = 100. - 100. / (1. + rs)
    return rsi

def money_flow_index(typc_prices_vecs, volume_data_vecs, period):
    Money Flow Index (MFI): This one measures the strength of money in and out
    of a security. The formula for MFI is as follows:
    Money Flow (MF) = Typical Price * Volume.
    Money Ratio (MR) = (Positive MF / Negative MF).
    :return: MFI = 100 – (100/ (1+MR)).
    mfi = []
    for typical_prices_vec, volume_data_vec in zip(typc_prices_vecs, volume_data_vecs):
        if np.shape(typical_prices_vec):
            mfi.append(money_flow_index_1d(typical_prices_vec, volume_data_vec, period))
            mfi = money_flow_index_1d(typc_prices_vecs, volume_data_vecs, period)
    return mfi

def money_flow_index_1d(typical_prices_vec, volume_data_vec, period):
    len_ty_prices_vec = len(typical_prices_vec)
    period_error(period, len_ty_prices_vec)
    deltas = np.diff(typical_prices_vec)
    raw_money_flow = np.array([typ*vol for typ, vol in zip(typical_prices_vec[1:], volume_data_vec)])
    mfi = [0]*(len_ty_prices_vec - period)
    for i in range(0, len_ty_prices_vec - period):
        seed = deltas[i:i + period]
        up = raw_money_flow[i:i + period][seed >= 0].sum() / period
        down = raw_money_flow[i:i + period][seed < 0].sum() / period
        mr = up / down if down != 0 else 100
        mfi[i] = 100. - 100. / (1. + mr)
    return mfi

def typical_prices_vecs(price_list_high, price_list_low, price_list_close):
    typ = []
    for high, low, close in zip(price_list_high, price_list_low, price_list_close):
        if np.shape(high):
            typ.append(typical_prices_vecs_1d(high, low, close))
            typ = typical_prices_vecs_1d(price_list_high, price_list_low, price_list_close)
    return typ

def typical_prices_vecs_1d(price_list_high, price_list_low, price_list_close):
    return [np.mean([high, low, close]) for high, low, close in
            zip(price_list_high, price_list_low, price_list_close)]

def exponential_moving_average(close_price_vecs, period):
    Exponential Moving Average (EMA): This indicator
    returns the exponential moving average of a field over a
    given period of time. EMA formula is as follows.
    :return: EMA = [alpha *T Close] + [1-alpha *Y close]
    Where T is Today’s close and Y is Yesterday’s close
    ema = []
    for close_price_vec in close_price_vecs:
        if np.shape(close_price_vec):
            ema.append(exponential_moving_average_1d(close_price_vec, period))
            ema = exponential_moving_average_1d(close_price_vecs, period)
    return ema

def exponential_moving_average_1d(close_prices_vec, period):
    len_cl_prices_vec = len(close_prices_vec)
    period_error(period, len_cl_prices_vec)
    close_prices_vec = np.asarray(close_prices_vec)
    weights = np.exp(np.linspace(-1., 0., period))
    weights /= weights.sum()
    ema = np.convolve(close_prices_vec, weights, mode='full')[:len(close_prices_vec)]
    ema = ema[period:]
    return ema

def stochastic_oscillator(high_prices_vecs, low_prices_vecs, close_prices_vecs, period):
    Stochastic Oscillator (SO): The stochastic oscillator
    defined as a measure of the difference between the
    current closing price of a security and its lowest low
    price, relative to its highest high price for a given period
    of time. The formula for this computation is as follows:
    :return: %K = [(Close price – Lowest price) / (Highest Price – Lowest Price)] * 100
    so = []
    for high_prices_vec, low_prices_vec, close_prices_vec in zip(high_prices_vecs, low_prices_vecs, close_prices_vecs):
        if np.shape(high_prices_vec):
            so.append(stochastic_oscillator_1d(high_prices_vec, low_prices_vec, close_prices_vec, period))
            so = stochastic_oscillator_1d(high_prices_vecs, low_prices_vecs, close_prices_vecs, period)
    return so

def stochastic_oscillator_1d(high_prices_vec, low_prices_vec, close_prices_vec, period):
    len_high_prices_vec = len(high_prices_vec)
    period_error(period, len_high_prices_vec)
    max_high_vec = [max(high_prices_vec[i:i + period+1]) for i in range(0, len_high_prices_vec - period)]
    min_low_vec = [min(low_prices_vec[i:i + period+1]) for i in range(0, len_high_prices_vec - period)]
    curr_low_sub = np.subtract(close_prices_vec[period:], min_low_vec)
    high_min_sub = np.subtract(max_high_vec, min_low_vec)
    return [(curr_low_s / high_min_s) * 100 if high_min_s != 0 else 100
            for curr_low_s, high_min_s in zip(curr_low_sub, high_min_sub)]

def moving_average_convergence_divergence(close_prices_vecs, slow_period=26, fast_period=12):
    Moving Average Convergence/Divergence (MACD):
    This function calculates difference between a short and a long term moving average for a field.
    The formulas for calculating MACD.
    :return: MACD = [EMA of Closing prices] - [EMA of closing prices]
    macd = []
    for close_prices_vec in close_prices_vecs:
        if np.shape(close_prices_vec):
            macd.append(moving_average_convergence_divergence_1d(close_prices_vec, slow_period, fast_period))
            macd = moving_average_convergence_divergence_1d(close_prices_vecs, slow_period, fast_period)
    return macd

def moving_average_convergence_divergence_1d(close_prices_vec, slow_period=26, fast_period=12):
    len_cl_prices_vec = len(close_prices_vec)
    period_error(slow_period, len_cl_prices_vec)
    period_error(fast_period, len_cl_prices_vec)
    emaslow = exponential_moving_average(close_prices_vec, slow_period)
    emafast = exponential_moving_average(close_prices_vec, fast_period)
    return np.subtract(emafast[len(emafast) - len(emaslow):], emaslow)

def signal_line(macd_data_vecs, period=9):
    :return: Signal Line = period day EMA of MACD
    sl = []
    for macd_data_vec in macd_data_vecs:
        if np.shape(macd_data_vec):
            sl.append(signal_line_1d(macd_data_vec, period))
            sl = signal_line_1d(macd_data_vecs, period)
    return sl

def signal_line_1d(macd_data_vec, period=9):
    len_macd_data_vec = len(macd_data_vec)
    period_error(period, len_macd_data_vec)
    return exponential_moving_average(macd_data_vec, period)