'''
Alexander Binks & Moritz Guenther, 2024
Licence: MIT 2024
This module contains functions to normalise, detrend, and clean lightcurves.
The cleaning and detrending of the lightcurve takes part in 9 steps, each of
which are called by the parent function 'run_make_lc_steps'. The steps are:
| (1) Normalise the raw lightcurve.
| (2) Split the lightcurve into segments of contiguous data (no large time gaps
between datapoints)
| (3) Remove sparse data, i.e., data segments with only a few data points.
| (4) Detrend the lightcurve.
| (5) Clean the edges: remove flux outliers.
| (6) Clean the edges: scattered, noisy datapoints.
| (7) Remove extreme outliers from the lightcurve
| (8) Normalise the flux in each segment, using the median of qualified data.
| (9) Store the lightcurve to file.
'''
###############################################################################
####################################IMPORTS####################################
###############################################################################
# Internal
import inspect
import sys
# Third party
import numpy as np
import os
import json
from astropy.table import Table
from astropy.stats import akaike_info_criterion_lsq
from scipy.stats import median_abs_deviation as MAD
from scipy.optimize import curve_fit
import itertools as it
from operator import itemgetter
# Local application
from .file_io import logger_tessilator
###############################################################################
###############################################################################
###############################################################################
# initialize the logger object
logger = logger_tessilator(__name__)
[docs]
def get_time_segments(t, t_fac=10.):
'''Split the lightcurve into groups of contiguous data
Group data into segments where the time difference between data points is
less than some threshold factor, `t_fac`.
parameters
----------
t : `Iter`
The list of time coordinates from the lightcurve
t_fac : `float`, optional, default=10.
A time factor, which is multiplied by the median cadence of the full
lightcurve.
returns
-------
ds : `list`
The start indices of each time segment
df : `list`
The final indices of each time segment
'''
td = np.zeros(len(t))
td[1:] = np.diff(t)
t_arr = (td <= t_fac*np.median(td)).astype(int)
groups = (list(group) for key, group in it.groupby(enumerate(t_arr),
key=itemgetter(1))
if key)
ss = [[group[0][0], group[-1][0]] for group in groups
if group[-1][0] > group[0][0]]
ss = np.array(ss).T
ds, df = ss[0,:], ss[1,:]
ds[1:] = [ds[i]-1 for i in range(1,len(ds))]
df = np.array([(i+1) for i in df])
return ds, df
[docs]
def remove_sparse_data(x_first, x_last, min_crit_frac=.05, min_crit_num=50):
'''Removes very sparse data groups, when there are 3 or more groups.
Calculate the mean (mean_group) and standard deviation (std_group) for the
number of data points in each group (n_points).
If std_group > "std_crit", then only keep groups with n_points > std_crit.
parameters
----------
x_first : `Iterable`
The index values of the first element in each group
x_last : `Iterable`
The index values of the last element each group
min_crit_frac : `float`, optional, default=.05
The minimum relative size of a flux component when correcting for
sparse data in the cleaning functions.
min_crit_num : `int`, optional, default=50
The minimum number of data points required for a flux component in the
sparse data cleaning functions.
returns
-------
y_first : `np.array`
The index values of the first element of the new arrays
y_last : `np.array`
The index values of the last element of the new arrays
'''
try:
n_points = np.array([x_last[i] - x_first[i]
for i in range(len(x_first))])
n_tot = np.sum(n_points)
std_crit = max(min_crit_num, min_crit_frac*n_tot)
y_first, y_last = np.array(x_first), np.array(x_last)
if len(x_first) > 2:
std_group = np.std(n_points)
if std_group > std_crit:
g = n_points > std_crit
y_first, y_last = y_first[g], y_last[g]
return y_first, y_last
return y_first, y_last
except:
logger.warning("The sparse data removal algorithm failed. Retaining "
"the input indices.")
return y_first, y_last
[docs]
def aic_selector(x, y, poly_max=3, cov_min=1e-10):
'''Select the detrending polynomial from the Aikaike Information Criterion
This function uses the Aikaike Information Criterion (AIC) to find the most
appropriate polynomial order to a set of X, Y data points.
parameters
----------
x : `Iterable`
The independent variable
y : `Iterable`
The dependent variable
poly_max : `int`, optional, default=3
The maximum polynomial order to test
cov_min : `float`
A threshold value for the first element of the covariance matrix.
Sometimes the AIC will automatically select a higher-order
polynomial to a distribution that is clearly best fit by the
preceeding lower-order fit. For example, a second-order fit provides a
better fit for a perfect straight line. This is a bug in the numerical
rounding. Therefore, if the value of the first element of the
covariance matrix is less than cov_min for the lower order, then the
lower order fit is selected.
returns
-------
poly_ord : `int`
The best polynomial order
coeffs : `list`
The polynomial coefficients.
'''
q = 0
N = 1.*len(x)
try:
while q < poly_max:
k1, k2 = q+1, q+2
p1, r1, _,_,_ = np.polyfit(x, y, q, full=True)
p2, r2, _,_,_ = np.polyfit(x, y, q+1, full=True)
with np.errstate(invalid='ignore'):
SSR1 = np.sum((np.polyval(p1, x) - y)**2)
SSR2 = np.sum((np.polyval(p2, x) - y)**2)
AIC1 = akaike_info_criterion_lsq(SSR1, k1, N)
AIC2 = akaike_info_criterion_lsq(SSR2, k2, N)
if (AIC1 < AIC2) | (r1 < cov_min):
poly_ord, coeffs = q, p1
return poly_ord, list(coeffs)
else:
q += 1
if q >= poly_max:
poly_ord, coeffs = q, p2
return poly_ord, list(coeffs)
except:
return 0, [1.0]
[docs]
def relative_root_mean_squared_error(true, pred):
'''Return the relative root mean squared error (RRMSE)
Given a list of predicted and true values, calculate the RRMSE
parameters
----------
true : `Iter`
The set of true (measured) values
pred : `Iter`
The set of predicted (model) values
returns
-------
rrmse : `float`
The RRMSE value
'''
num = np.sum(np.square(true - pred))
den = np.sum(np.square(pred))
squared_error = num/den
rrmse = np.sqrt(squared_error)
return rrmse
[docs]
def smooth_test(time, flux, n_avg=10):
'''Determine how to detrend a lightcurve, based on a smoothness algorithm
The idea of this function is to catch lightcurves that appear to have
periods longer than ~15 days, and are notably smooth. The function
calculates a sine fit to the (linearly) detrended lightcurve, then
smoothes it using a running mean. Finally, there are three criteria to
decide how the lightcurve should be detrended. A boolean flag is returned,
where False=individual groups and True=the whole lightcurve.
parameters
----------
time : `Iter`
A set of time coordinates
flux : `Iter`
A set of flux coordinates
n_avg : `float`, optional, default=10
The number of datapoints to be used for the running mean calculation.
returns
-------
smooth_flag : `bool`
A boolean flag to (partially) determine how the lightcurve should be
detrended (False=individual groups, True=the whole lightcurve)
'''
# 1) detrend the whole lightcurve by a linear fit, and calculate the MAD
t_new = time-time[0]
p1, r1, _,_,_ = np.polyfit(t_new, flux, 1, full=True)
f_new = flux/np.polyval(p1, t_new)
f_MAD = MAD(f_new, scale='normal')
# 2) make a sine fit to the detrended lightcurve
pops, popsc = curve_fit(sin_fit_per, t_new, f_new,
bounds=([0.5, 0.0, 0.0, 0.0],
[1.5, 0.2, 100., 2.*np.pi]))
yp = sin_fit_per(t_new, *pops)
# 3) smooth the arrays using a running mean
yp_sm = np.array(np.convolve(yp, np.ones(n_avg)/n_avg, mode='valid'))
flux_sm = np.array(np.convolve(f_new, np.ones(n_avg)/n_avg, mode='valid'))
t_sm = np.array(np.convolve(t_new, np.ones(n_avg)/n_avg, mode='valid'))
# 4) subtract the smoothed "raw" flux by the smoothed sine fit
diff_flux = flux_sm - yp_sm
d_MAD = MAD(diff_flux, scale='normal')
# 5) calculate the RRMSE between the detrended flux and the sine fit
rrmse = relative_root_mean_squared_error(f_new, yp)
# 6) false if:
# a) the predicted period from the sine fit is > 15. days
# b) the RRMSE is < 0.01
# c) the ratio of the MAD between the differential and original flux
# is < 0.25
if (rrmse < 0.01) & \
(pops[2] > 15.) & (pops[2] < 99.9) & \
(d_MAD/f_MAD < 0.25):
return True
else:
return False
[docs]
def norm_choice(t_orig, f_orig, lc_part, MAD_fac=2., poly_max=4):
'''Choose whether to detrend the lightcurve as one by individual groups.
There are always at least two data components in a TESS sector because of
the finite time needed for data retrieval. This can sometimes lead to
discontinuities between components because of TESS systematics and the
temperature gradients across the photometer. These discontinuities can
cause the phase of the sinusoidal fit to change, leading to low power
output in the periodograms. Alternatively, if the data are all detrended
individually, but the data is relatively continuous, this can lead to
shorter period measurements.
The idea with this function is that a polynomial fit is made to each
component (chosen using the Aikaike Information Criterion, AIC). The
extrapolated flux value from component 1 is calculated at the point where
component 2 starts. If the difference between this value and the actual
normalised flux at this point is greater than a given threshold, the data
should be detrended separately. Otherwise the full lightcurve can be
detrended as a whole.
In addition, if the "smooth" flag (calculated from the "smooth_test"
function) is True, then the lightcurve is detrended as a whole, regardless
of the outcome from this function.
parameters
----------
t_orig : `Iterable`
The time coordinate
f_orig : `Iterable`
The original, normalised flux values
lc_part : `Iterable`
The running index for each contiguous data section in the lightcurve
MAD_fac : `float`, optional, default=2.
The factor which is to be multiplied by the median absolute deviation.
poly_max : `float`, optional, default=4
The maximum polynomial order to test for the AIC evaluation.
returns
-------
norm_flag : `bool`
Determines whether the data should be detrended as one whole component
(False) or by individual groups (True, providing the smooth flag is
False).
smooth_flag : `bool`
Determines whether any detrending should be performed by testing how
smooth the lightcurve is.
f1_at_f2_0 : `list`
The extrapolated fluxes from group 1, calculated at the start point of
group 2.
f2_at_f2_0 : `list`
The first flux values from group 2
f1_MAD : `list`
The MAD fluxes from group 1
f2_MAD : `list`
The MAD fluxes from group 2
'''
norm_flag = False
Ncomp = len(np.unique(lc_part))
smooth_flag = smooth_test(t_orig, f_orig)
f1_at_f2_0, f2_at_f2_0, f1_MAD, f2_MAD = [], [], [], []
if Ncomp > 1:
i = 1
while i < Ncomp:
g1 = np.array(lc_part == i)
g2 = np.array(lc_part == i+1)
try:
s_fit1, coeff1 = aic_selector(t_orig[g1], f_orig[g1],
poly_max=poly_max)
s_fit2, coeff2 = aic_selector(t_orig[g2], f_orig[g2],
poly_max=poly_max)
f1_at_f2_0.append(np.polyval(coeff1, t_orig[g2][0]))
# The line below IS supposed to be at index "g2"
f2_at_f2_0.append(np.polyval(coeff2, t_orig[g2][0]))
f1_n = f_orig[g1]/np.polyval(coeff1, t_orig[g1])
f2_n = f_orig[g2]/np.polyval(coeff2, t_orig[g2])
f1_MAD.append(MAD(f1_n, scale='normal'))
f2_MAD.append(MAD(f2_n, scale='normal'))
if 2.*abs(f1_at_f2_0[i-1] - f2_at_f2_0[i-i]) > \
MAD_fac*((f1_MAD[i-1]+f2_MAD[i-1])/2.):
norm_flag = True
break
else:
i += 1
except:
logger.error('Could not run the AIC selector, '
'probably because of a zero-division.')
f1_at_f2_0.append(np.polyval([1], t_orig[g2][0]))
f2_at_f2_0.append(np.polyval([1], t_orig[g2][0]))
f1_n = f_orig[g1]
f2_n = f_orig[g2]
f1_MAD.append(MAD(f1_n, scale='normal'))
f2_MAD.append(MAD(f2_n, scale='normal'))
norm_flag = False
break
if smooth_flag == True:
norm_flag = False
return norm_flag, smooth_flag, f1_at_f2_0, f2_at_f2_0, f1_MAD, f2_MAD
[docs]
def detrend_lc(t,f,lc, MAD_fac=2., poly_max=3):
'''Detrend and normalise the lightcurves.
| This function runs 3 operations to detrend the lightcurve, as follows:
| 1. Choose whether a zeroth- or first-order polynomial is the best fit to
the full light-curve, using AIC, and detrend the full lightcurve.
| 2. Decide whether to use the detrended lightcurve from part 1, or to
detrend individual groups.
| 3. Return the detrended flux.
parameters
----------
t : `Iterable`
the time component of the lightcurve
f : `Iterable`
the flux component of the lightcurve.
lc : `Iterable`
The index representing the lightcurve component. Note this
must be indexed starting from 1.
MAD_fac : `float`, optional, default = 2.
The factor to multiply the median absolute deviation by.
poly_max : `int`, optional, default=8
The maximum order of the polynomial fit.
returns
-------
f_norm : `Iterable`
The corrected lightcurve after the detrending procedures.
detr_dict : `dict`
A dictionary containing the parameters: norm_flag, smooth_flag,
"f1_at_f2_0, f2_at_f2_0, f1_MAD, f2_MAD" (see norm_choice) and
"s_fit, coeffs" (see aic_selector)
'''
# 1. Choose the best detrending polynomial using the Aikaike Information
# Criterion, and detrend the lightcurve as a whole.
s_fit_0, coeffs_0 = aic_selector(t, f, poly_max=poly_max)
f_norm = f/np.polyval(coeffs_0, t)
# 2. Decide whether to use the detrended lightcurve from part 1, or to
# separate the lightcurve into individual components and detrend each
# one separately
norm_flag, smooth_flag, f1_at_f2_0, f2_at_f2_0, f1_MAD, f2_MAD = \
norm_choice(t, f, lc, MAD_fac=MAD_fac, poly_max=poly_max)
# 3. Detrend the lightcurve following steps 1 and 2.
s_fit, coeffs = [], []
if norm_flag:
# normalise each component separately.
f_detrend = np.array([])
for l in np.unique(lc):
g = np.array(lc == l)
s_fit_n, coeffs_n = aic_selector(t[g], f[g], poly_max=poly_max)
s_fit.append(s_fit_n)
coeffs.append(coeffs_n)
f_n = f[g]/np.polyval(coeffs_n, t[g])
f_detrend = np.append(f_detrend, f_n)
f_norm = f_detrend
else:
# normalise the entire lightcurve as a whole
f_norm = f# f_norm
detr_dict = {'norm_flag' : norm_flag,
'smooth_flag' : smooth_flag,
'f1_at_f2_0' : f1_at_f2_0,
'f2_at_f2_0' : f2_at_f2_0,
'f1_MAD' : f1_MAD,
'f2_MAD' : f2_MAD,
's_fit' : s_fit,
'coeffs' : coeffs}
return f_norm, detr_dict
[docs]
def clean_flux_algorithm(g):
'''A basic algorithm that trims both sides of a contiguous data string
if a condition is not satisfied, until the condition is met for the
first time.
parameters
----------
g : `Iter`
The outcome for each datapoint, qualified (=1) or not qualified (=0)
returns
-------
first : `int`
The trimmed first point of the array
last : `int`
The trimmed last point of the array
'''
i, j = 0, len(g)-1
while i < j:
if g[i] != 1:
i+=1
else:
first=i
break
while j > 0:
if g[j] != 1:
j-=1
else:
last=j
break
if j <= i:
first, last = 0, len(g)-1
return first, last
else:
return first, last
[docs]
def clean_edges_outlier(f, MAD_fac=2.):
'''Remove spurious outliers at the start and end parts of groups.
The start and end point of each group must have a flux value within a given
number of MAD from the median flux in the group. This is done because
during data downlinks, the temperature of the sensors can change notably.
Therefore the outlying flux points at the group edges are probably from
temperature instabilities.
parameters
----------
f : `Iterable`
The set of normalised flux coordinates
MAD_fac : `float`, optional, default=2.
The threshold number of MAD values to allow.
returns
-------
first : `int`
The start index for the data string.
last : `int`
The end index for the data string.
'''
f_med, f_MAD = np.median(f), MAD(f, scale='normal')
try:
g = (np.abs(f-f_med) < MAD_fac*f_MAD).astype(int)
first, last = clean_flux_algorithm(g)
except:
logger.error('Something went wrong with the arrays when doing the '
'lightcurve edge clipping')
first, last = 0, len(g)-1
return first, last
[docs]
def clean_edges_scatter(f, MAD_fac=2., len_sub_raw=11, num_data_fac=0.1):
'''Remove highly-scattered data at the edges of each group.
Some groups have very scattered fluxes at the edges, presumably because the
sensors are unstable before and after data downlinks. These can degrade the
quality of the periodogram analysis, or even lead to an incorrect period.
The idea is to group the first "n_sub" datapoints, and calculate the median
absolute deviation (MAD). If this local MAD value is greater (less) than
"MAD_fac" times the MAD of the full lightcurve, then the flag at this point
is 0 (1). The first and last "(n_sub-1)/2" in the lightcurve are given a
constant value. If the first/last MAD comparison yield a "1" value, then we
include the full group, including the datapoints replaced with constant
values -- i.e., no cleaning is necessary.
The value for n_sub is chosen as the minimum value of "len_sub_raw", or
num_data_fac*(the number of datapoints in the whole set).
parameters
----------
f : `Iterable`
The set of normalised flux coordinates
MAD_fac : `float`, optional, default=2.
The threshold number of MAD values to allow.
len_sub_raw : `int`, optional, default=11
The number of data points to be used in the local MAD value.
num_data_fac : `float`, optional, default=0.1
The factor to multiply the number of data points by.
returns
-------
first : `int`
The start index for the data string.
last : `int`
The end index for the data string.
'''
n_sub =min(len_sub_raw, int(num_data_fac*len(f)))
if n_sub // 2 == 0:
n_sub += 1
p_e = int((n_sub-1)/2)
# get the median time and flux, the median absolute deviation in flux
# and the time difference for each neighbouring point.
f_med, f_MAD = np.median(f), MAD(f, scale='normal')
f_diff = np.zeros(len(f))
f_diff[1:] = np.diff(f)
f_diff_med = np.median(np.absolute(f_diff))
f_x = np.array([MAD(f[i:i+n_sub], scale='normal')
for i in range(len(f)-n_sub+1)])
f_diff_run = np.pad(f_x, (p_e, p_e), 'constant',
constant_values=(MAD_fac*f_MAD, MAD_fac*f_MAD))
try:
g = (np.abs(f_diff_run) < MAD_fac*f_diff_med).astype(int)
first, last = clean_flux_algorithm(g)
if first <= p_e:
first = 0
elif first > p_e:
first = np.where(g)[0][p_e]
if last >= len(g)-1-(2*p_e+1):
last = len(g)-1
elif last < len(g)-1-(2*p_e+1):
last = np.where(g)[0][-p_e]
except:
logger.error('Something went wrong with the arrays when doing the '
'lightcurve edge clipping')
first, last = 0, len(g)-1
return first, last
[docs]
def run_make_lc_steps(f_lc, f_orig, min_crit_frac=0.1, min_crit_num=50,
outl_mad_fac=3.):
'''Process the lightcurve: cleaning, normalisation and detrending functions
| During each procedure, the function keeps a record of datapoints that are
| kept or rejected, allowing users to assess the amount of data loss.
| The function makes the following steps...
| 1. normalise the original flux points
| 2. split the lightcurve into 'time segments'
| 3. remove very sparse elements from the lightcurve
| 4. run the first detrending process to pass to the cleaning function.
| 5. clean the lightcurve edges from outliers
| 6. clean the lightcurve edges from scattered data
| 7. finally cut out data that are extreme outliers.
| 8. divide each lightcurve component by the median flux value
| of qualifying data points.
| 9. return the dictionary
parameters
----------
f_lc : `dict`
The initial lightcurve with the minimum following keys required:
(1) 'time' -> the time coordinate
(2) 'eflux' -> the error in the flux
(3) 'f_orig' -> see the f_orig parameter
f_orig : `str`
This string determines which of the original flux values to choose.
It forms the final part of the f_lc keys.
It could be either 'reg_oflux' (the regular, original flux) or
'cbv_oflux' (the original flux corrected using co-trending basis
vectors)
min_crit_frac : `float`, optional, default=0.1
The minimum relative size of a flux component when correcting for
sparse data in the cleaning functions.
min_crit_num : `int`, optional, default=50
The minimum number of data points required for a flux component in the
sparse data cleaning functions.
outl_mad_fac : `float`, optional, default=3.
The factor of MAD for the cleaned lightcurve flux values.
returns
-------
f_lc : `dict`
A dictionary storing the full set of results from the lightcurve
analysis.
As well as the keys from the inputs, the final keys returned are:
1: "time" -> the time coordinate.
2: "mag" -> the TESS magnitude.
3: "(reg/cbv)_oflux" -> the flux calculated from aperture photometry.
4: "eflux" -> the error bar on (reg/cbv)_oflux.
5: "nflux_ori" -> the normalised fluxes from (3).
6: "nflux_err" -> the error bars on (5).
7: "nflux_dtr" -> the normalised fluxes after the detrending steps.
8: "lc_part" -> an index referring to each group in the lightcurve.
9: "pass_sparse" -> boolean from `remove_sparse_data`
10: "pass_clean_outlier" -> boolean from clean_edges_outlier.
11: "pass_clean_scatter" -> boolean from clean_edges_scatter.
12: "pass_full_outlier" -> boolean from the final outlier rejection.
detr_dict : `dict`
The dictionary returned from `detrend_lc`
'''
# (1) normalise the original flux points
f_lc['nflux_ori'] = f_lc[f'{f_orig}']/np.median(f_lc[f'{f_orig}'])
f_lc['nflux_err'] = f_lc['eflux']/f_lc[f'{f_orig}']
logger.info('part1: initial normalisation -> done!')
# (2) split the lightcurve into 'time segments'
ds1, df1 = get_time_segments(f_lc["time"])
logger.info('part2: time segmentation -> done!')
# (3) remove very sparse elements from the lightcurve
ds2, df2 = remove_sparse_data(ds1, df1, min_crit_frac=min_crit_frac,
min_crit_num=min_crit_num)
f_lc["pass_sparse"] = np.array(np.zeros(len(f_lc["time"])), dtype='bool')
for s, f in zip(ds2, df2):
f_lc["pass_sparse"][s:f] = True
logger.info('part3: remove sparse data -> done!')
# (4) run the first detrending process to pass to the cleaning function.
f_lc["lc_part"] = np.zeros(len(f_lc["time"]), dtype=int)
for i, (s, f) in enumerate(zip(ds2, df2)):
f_lc["lc_part"][s:f] = int(i+1)
g_cln = f_lc["pass_sparse"]
f_lc["nflux_dtr"] = np.full(len(f_lc["time"]), -999.)
f_lc["nflux_dtr"][g_cln], detr_dict = detrend_lc(f_lc["time"][g_cln],
f_lc["nflux_ori"][g_cln],
f_lc["lc_part"][g_cln],
poly_max=1)
logger.info('part4: detrending -> done!')
# (5) clean the lightcurve edges from outliers
ds3, df3 = [], []
for lc in np.unique(f_lc["lc_part"][g_cln]):
g = np.where(f_lc["lc_part"] == lc)[0]
s_o, f_o = clean_edges_outlier(f_lc["nflux_dtr"][g])
ds3.append(g[s_o])
df3.append(g[f_o])
f_lc["pass_clean_outlier"] = np.array(np.zeros(len(f_lc["time"])),
dtype='bool')
for s, f in zip(ds3, df3):
f_lc["pass_clean_outlier"][s:f] = True
logger.info('part5: clean edges, outliers -> done!')
# (6) clean the lightcurve edges from scattered data
ds4, df4 = [], []
for lc in np.unique(f_lc["lc_part"][g_cln]):
g = np.where(f_lc["lc_part"] == lc)[0]
s_s, f_s = clean_edges_scatter(f_lc["nflux_dtr"][g])
ds4.append(g[s_s])
df4.append(g[f_s])
f_lc["pass_clean_scatter"] = np.array(np.zeros(len(f_lc["time"])),
dtype='bool')
for s, f in zip(ds4, df4):
f_lc["pass_clean_scatter"][s:f] = True
logger.info('part6: clean edges, scatter -> done!')
# (7) finally cut out data that are extreme outliers.
med_lc = np.median(f_lc["nflux_dtr"][g_cln])
MAD_lc = MAD(f_lc["nflux_dtr"][g_cln], scale='normal')
f_lc["pass_full_outlier"] = np.array(np.zeros(len(f_lc["time"])),
dtype='bool')
for f in range(len(f_lc["time"])):
if abs(f_lc["nflux_dtr"][f] - med_lc) < outl_mad_fac*MAD_lc:
f_lc["pass_full_outlier"][f] = True
logger.info('part7: remove extreme points -> done!')
# (8) divide each lightcurve component by the median flux value of
# qualifying data points.
for lc in np.unique(f_lc["lc_part"][g_cln]):
g = np.where(f_lc["lc_part"] == lc)[0]
gx = np.logical_and.reduce([
f_lc["pass_sparse"][g],
f_lc["pass_clean_scatter"][g],
f_lc["pass_clean_outlier"][g],
f_lc["pass_full_outlier"][g]
])
flux_vals = f_lc["nflux_dtr"][g[gx]]
med_flux = np.median(flux_vals[flux_vals > 0.])
f_lc["nflux_dtr"][f_lc["nflux_dtr"] < 0] = -999
logger.info('part8: write the dictionary -> done!')
# (9) return the dictionary
logger.info('part9: FINISHED!')
return f_lc, detr_dict
[docs]
def sin_fit(x, y0, A, phi):
'''
Returns the best parameters (y_offset, amplitude, and phase) to a regular
sinusoidal function.
parameters
----------
x : `Iterable`
list of input values
y0 : `float`
The midpoint of the sine curve
A : `float`
The amplitude of the sine curve
phi : `float`
The phase angle of the sine curve
returns
-------
sin_fit : `list`
A list of sin curve values.
'''
sin_fit = y0 + A*np.sin(2.*np.pi*x + phi)
return sin_fit
[docs]
def sin_fit_per(t, y0, A, per, phi):
'''
Returns the best parameters (y_offset, amplitude, and phase) to a regular
sinusoidal function.
parameters
----------
t : `Iterable`
list of input values
y0 : `float`
The midpoint of the sine curve
A : `float`
The amplitude of the sine curve
per : `float`
The period of the sine curve
phi : `float`
The phase angle of the sine curve
returns
-------
sin_fit_per : `list`
A list of sin curve values.
'''
sin_fit_per = y0 + A*np.sin((2.*np.pi*t/per) + phi)
return sin_fit_per
[docs]
def cbv_fit_test(t, of, cf):
'''Determine whether the cbv-corrected lightcurve should be considered.
Whilst the cbv-corrected flux are designed to eliminate systematic
artefacts by identifying features common to many stars (using principle
component analysis), the routine can overfit the data, and often the cbv
corrections inject too much unwanted noise (particularly for targets with
low signal to noise).
Therefore the plan here is to assess lightcurves produced by the cbv
corrections by comparing basic attributes with the regular (non-corrected)
lightcurves. These scores come down to:
1: the number of outliers
2: the size of the median absolute deviation
3: which lightcurve provides the lowest chi-squared value to a sine fit.
If the "original lightcurve" scores higher, then the cbv-corrected
lightcurve is not considered for further analysis.
parameters
----------
t : Iterable
The time component of the lightcurve
of : Iterable
The original flux
cf : Iterable
The cbv-corrected flux
returns
-------
use_cbv : bool
True if cf score > of score, else False.
'''
of_score, cf_score = 0, 0
#1) number of outliers test
of_nflux = np.array(of)/np.median(of)
cf_nflux = np.array(cf)/np.median(cf)
of_nMADf = MAD(of_nflux, scale='normal')
cf_nMADf = MAD(cf_nflux, scale='normal')
num_of = np.sum(abs(of_nflux-1.) > of_nMADf)
num_cf = np.sum(abs(cf_nflux-1.) > cf_nMADf)
if num_of > num_cf:
cf_score += 1
else:
of_score += 1
#2) which has the largest MAD value
if of_nMADf > cf_nMADf:
cf_score += 1
else:
of_score += 1
#3) which makes the best sine fit?
try:
pops_of, popsc_of = curve_fit(sin_fit, t, of_nflux,
bounds=(0, [2., 2., 1000.]))
pops_cf, popsc_cf = curve_fit(sin_fit, t, cf_nflux,
bounds=(0, [2., 2., 1000.]))
yp_of = sin_fit(of_nflux, *pops_of)
yp_cf = sin_fit(cf_nflux, *pops_cf)
chi_of = np.sum((yp_of-of_nflux)**2)/(len(of_nflux)-len(pops_of)-1)
chi_cf = np.sum((yp_cf-cf_nflux)**2)/(len(cf_nflux)-len(pops_cf)-1)
if chi_of > chi_cf:
cf_score += 1
else:
of_score += 1
except:
logger.error('Could not do the sine-fit comparison for ori vs cbv '
'lightcurves')
# get the final score - if cbv wins, then a True statement is returned.
if of_score >= cf_score:
use_cbv = False
else:
use_cbv = True
return use_cbv
[docs]
def make_lc(phot_table, name_lc='target', store_lc=False, lc_dir='lc', cbv_flag=False):
'''Construct the normalised, detrended, cleaned TESS lightcurve.
This is essentially a parent function that performs all the steps in fixing
the lightcurve.
The returned product is an array containing the new tabulated lightcurve
data for the original (unfiltered) aperture photometry, and (if necessary)
another one for the CBV-corrected fluxes (see the 'cbv_fit_test' function
for more information.)
parameters
----------
phot_table : `astropy.table.Table` or `dict`
| The data table containing aperture photometry. Columns must include:
| "time" -> The time coordinate for each image
| "mag" -> The target magnitude
| "(reg/cbv)_oflux" -> The total flux subtracted by the background flux
| "flux_err" -> The error on flux_corr
name_lc : `str`, optional, default='target'
The name of the file which the lightcurve data will be saved to.
The target name
store_lc : `bool`, optional, default=False
Choose to save the cleaned lightcurve to file
lc_dir : `str`, optional, default='lc'
The directory used to store the lightcurve files if lc_dir==True
cbv_flag : `bool`, optional, default=False
Choose whether to analyse the lightcurves for CBV-corrected data.
returns
-------
final_tabs : `list`
A list of tables containing the lightcurve data
These are for the original lightcurve, and the cbv-corrected lightcurve
if required and it satisfies the criteria from cbv_fit_test.
norm_flags : `list`
A list of norm_flag values from the detrending algorithm.
smooth_flags : `list`
A list of smooth_flag values from the detrending algorithm.
'''
logger.info(f'Running the lightcurve analysis for {name_lc}')
f_labels = ['reg_oflux']
cbv_ret = False
if cbv_flag:
if "cbv_oflux" in phot_table.colnames:
f_labels.append('cbv_oflux')
use_cbv = cbv_fit_test(phot_table["time"], phot_table["reg_oflux"],
phot_table["cbv_oflux"])
if use_cbv:
cbv_ret = True
final_tabs, norm_flags, smooth_flags = [], [], []
for f_label in f_labels:
logger.info(f'using the flux label: {f_label}')
final_lc = {}
final_lc["time"] = phot_table["time"].data
final_lc["mag"] = phot_table["mag"].data
final_lc[f'{f_label}'] = phot_table[f'{f_label}'].data
final_lc["eflux"] = phot_table["flux_err"].data
flux_dict, detr_dict = run_make_lc_steps(final_lc, f_label)
norm_flag = detr_dict["norm_flag"]
smooth_flag = detr_dict["smooth_flag"]
keyorder = ['time','mag',f_label,'eflux','nflux_ori','nflux_err',
'nflux_dtr','lc_part','pass_sparse','pass_clean_outlier',
'pass_clean_scatter','pass_full_outlier']
tab_format = ['.6f','.6f','.6f','.6f',
'.6f','.4e','.6f','%i',
'%s','%s','%s','%s']
flux_dict = {k: flux_dict[k] for k in keyorder}
if len(flux_dict["time"]) > 50:
flux_tab = Table(flux_dict)
for n, f in zip(keyorder, tab_format):
flux_tab[n].info.format = f
# for k, f in zip(keyorder, tab_format):
# flux_tab[k].info.format =x f
# if f_label == "reg_oflux":
final_tabs.append(flux_tab)
norm_flags.append(norm_flag)
smooth_flags.append(smooth_flag)
# if (f_label == "cbv_oflux") and (cbv_ret):
# final_tabs.append(flux_tab)
# norm_flags.append(norm_flag)
# smooth_flags.append(smooth_flag)
if store_lc:
path_exist = os.path.exists(f'./{lc_dir}')
if not path_exist:
os.makedirs(f'./{lc_dir}')
flux_tab.write(f'./{lc_dir}/{name_lc}_{f_label}.csv',
format='csv', overwrite=True)
with open(f'./{lc_dir}/{name_lc}_{f_label}.json', 'w') \
as convert_file:
convert_file.write(json.dumps(detr_dict))
return final_tabs, norm_flags, smooth_flags
__all__ = [item[0] for item in inspect.getmembers(sys.modules[__name__],
predicate = lambda f: inspect.isfunction(f)
and f.__module__ == __name__)]