Skip to content

Commit

Permalink
feat: show peak info
Browse files Browse the repository at this point in the history
  • Loading branch information
Lan Le committed Apr 24, 2024
1 parent ede0d0c commit db34088
Showing 1 changed file with 153 additions and 50 deletions.
203 changes: 153 additions & 50 deletions chem_spectra/lib/composer/ni.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re # noqa: E402
import numpy as np # noqa: E402
from matplotlib import ticker # noqa: E402
from matplotlib.patches import FancyArrowPatch
import csv

from chem_spectra.lib.composer.base import ( # noqa: E402, F401
Expand Down Expand Up @@ -273,12 +274,10 @@ def tf_img(self):
plt.xlim(xlim_left, xlim_right)
y_max, y_min = np.max(self.core.ys), np.min(self.core.ys)
h = y_max - y_min
w = x_max - x_min
y_boundary_min = y_min - h * 0.2
y_boundary_max = y_max + h * 0.2
# plt.ylim(
# y_min - h * 0.2,
# y_max + h * 0.2,
# )
y_boundary_max = y_max + h * 0.5

# PLOT peaks
faktor = self.__fakto()
path_data = [
Expand Down Expand Up @@ -366,11 +365,9 @@ def tf_img(self):
markersize=50,
)

# ----- PLOT integration -----
# ----- Calculate integration -----
cys = []
refShift, refArea = self.refShift, self.refArea
itg_h = y_max + h * 0.1
itg_h = itg_h + itg_h * 0.1
if (len(self.all_itgs) == 0 and len(self.core.itg_table) > 0 and not self.core.params['integration'].get('edited') and ('originStack' not in self.core.params['integration'])):
core_itg_table = self.core.itg_table[0]
itg_table = core_itg_table.split('\n')
Expand All @@ -383,40 +380,9 @@ def tf_img(self):
xUStr = split_itg[1].strip()
areaStr = split_itg[2].strip()
self.all_itgs.append({'xL': float(xLStr), 'xU': float(xUStr), 'area': float(areaStr)}) # noqa: E501
for itg in self.all_itgs:
# integration marker
xL, xU, area = itg['xL'] - refShift, itg['xU'] - refShift, itg['area'] * refArea # noqa: E501
# integration curve
ks = calc_ks(self.core.ys, y_max, h)
iL, iU = get_curve_endpoint(self.core.xs, self.core.ys, xL, xU)
ref = ks[iL]
cxs = self.core.xs[iL:iU]

if self.core.is_hplc_uv_vis:
# fill area under curve
cys = self.core.ys[iL:iU]
slope = cal_slope(cxs[0], cys[0], cxs[len(cxs)-1], cys[len(cys)-1]) # noqa: E501
last_y = cys[0]
last_x = cxs[0]
aucys = [last_y]
for i in range(1, len(cys)):
curr_x = cxs[i]
curr_y = slope*(curr_x-last_x) + last_y
aucys.append(curr_y)
last_x = curr_x
last_y = curr_y
plt.fill_between(cxs, y1=cys, y2=aucys, alpha=0.2, color='#FF0000') # noqa: E501
else:
# display integration
plt.plot([xL, xU], [itg_h, itg_h], color='#228B22')
plt.plot([xL, xL], [itg_h + h * 0.01, itg_h - h * 0.01], color='#228B22') # noqa: E501
plt.plot([xU, xU], [itg_h + h * 0.01, itg_h - h * 0.01], color='#228B22') # noqa: E501
plt.text((xL + xU) / 2, itg_h + h * 0.015, '{:0.2f}'.format(area), color='#228B22', ha='center', size=12) # noqa: E501
# cys = (ks[iL:iU] - ref) * 1.5 + (y_max - h * 0.4)
cys = (ks[iL:iU] - ref) * 1.5 + itg_h + itg_h * 0.1
plt.plot(cxs, cys, color='#228B22')


# ----- PLOT multiplicity -----
# ----- Calculate multiplicity -----
if (len(self.mpys) == 0 and len(self.core.mpy_itg_table) > 0 and not self.core.params['integration'].get('edited') and ('originStack' not in self.core.params['integration'])):
core_mpy_pks_table = self.core.mpy_pks_table[0]
mpy_pks_table = core_mpy_pks_table.split('\n')
Expand Down Expand Up @@ -454,23 +420,29 @@ def tf_img(self):
mpy_item['mpyType'] = typeStr
mpy_item['peaks'] = tmp_dic_mpy_peaks[idxStr]
self.mpys.append(mpy_item)
mpy_h = y_min - h * 0.08

# ----- PLOT integration -----
itg_h = y_max + h * 0.6
itg_h = itg_h + itg_h * 0.1
itg_value_position_x = y_min - h * 0.2
y_boundary_min += itg_value_position_x
if (len(self.mpys) == 0):
itg_value_position_x = y_min - h * 0.05
y_boundary_max = self.__draw_integrals(plt, refShift, refArea, y_max, h, itg_value_position_x, itg_h)

# ----- PLOT multiplicity -----
mpy_h = y_min - h * 0.06
for mpy in self.mpys:
xL, xU, area, typ, peaks = mpy['xExtent']['xL'] - refShift, mpy['xExtent']['xU'] - refShift, mpy['area'] * refArea, mpy['mpyType'], mpy['peaks'] # noqa: E501
plt.plot([xL, xU], [mpy_h, mpy_h], color='#DA70D6')
plt.plot([xL, xL], [mpy_h + h * 0.01, mpy_h - h * 0.01], color='#DA70D6') # noqa: E501
plt.plot([xU, xU], [mpy_h + h * 0.01, mpy_h - h * 0.01], color='#DA70D6') # noqa: E501
plt.text((xL + xU) / 2, mpy_h - h * 0.1, '({})'.format(typ), color='#DA70D6', ha='center', size=12) # noqa: E501
plt.text((xL + xU) / 2, mpy_h - h * 0.06, '{:0.3f}'.format(calc_mpy_center(mpy['peaks'], refShift, mpy['mpyType'])), color='#DA70D6', ha='center', size=12) # noqa: E501
# plt.text((xL + xU) / 2, mpy_h - h * 0.08, '({})'.format(typ), color='#DA70D6', size=10, rotation=90.) # noqa: E501
plt.text((xL + xU) / 2, mpy_h - h * 0.01, '{:0.3f}\n({})'.format(calc_mpy_center(mpy['peaks'], refShift, mpy['mpyType']), typ), color='#DA70D6', size=10, rotation=90., ha='right', va='top', rotation_mode='anchor') # noqa: E501
for p in peaks:
x = p['x']
plt.plot([x - refShift, x - refShift], [mpy_h, mpy_h + h * 0.05], color='#DA70D6') # noqa: E501

try:
cys_max = np.max(cys)
y_boundary_max = cys_max + cys_max * 0.1
except:
pass
plt.ylim(
y_boundary_min,
y_boundary_max,
Expand All @@ -492,13 +464,145 @@ def tf_img(self):
plt.ylabel("Y ({})".format(self.core.label['y']), fontsize=18)
plt.locator_params(nbins=self.__plt_nbins())
plt.grid(False)

self.__draw_peaks(plt, x_peaks, y_peaks, h, w)

# Save
tf_img = tempfile.NamedTemporaryFile(suffix='.png')
plt.savefig(tf_img, format='png')
tf_img.seek(0)
plt.clf()
plt.cla()
return tf_img


def __draw_integrals(self, plt, refShift, refArea, y_max, h, itg_value_position_x, itg_h):
y_boundary_max = y_max
for itg in self.all_itgs:
# integration marker
xL, xU, area = itg['xL'] - refShift, itg['xU'] - refShift, itg['area'] * refArea # noqa: E501
# integration curve
ks = calc_ks(self.core.ys, y_max, h)
iL, iU = get_curve_endpoint(self.core.xs, self.core.ys, xL, xU)
ref = ks[iL]
cxs = self.core.xs[iL:iU]

if self.core.is_hplc_uv_vis:
# fill area under curve
cys = self.core.ys[iL:iU]
slope = cal_slope(cxs[0], cys[0], cxs[len(cxs)-1], cys[len(cys)-1]) # noqa: E501
last_y = cys[0]
last_x = cxs[0]
aucys = [last_y]
for i in range(1, len(cys)):
curr_x = cxs[i]
curr_y = slope*(curr_x-last_x) + last_y
aucys.append(curr_y)
last_x = curr_x
last_y = curr_y
plt.fill_between(cxs, y1=cys, y2=aucys, alpha=0.2, color='#FF0000') # noqa: E501
else:
# display integration
plt.plot([xL, xU], [itg_value_position_x, itg_value_position_x], color='#228B22')
plt.plot([xL, xL], [itg_value_position_x + h * 0.01, itg_value_position_x - h * 0.01], color='#228B22') # noqa: E501
plt.plot([xU, xU], [itg_value_position_x + h * 0.01, itg_value_position_x - h * 0.01], color='#228B22') # noqa: E501
plt.text((xL + xU) / 2, itg_value_position_x - h * 0.01, '{:0.2f}'.format(area), color='#228B22', ha='right', rotation_mode='anchor', size=10, rotation=90.) # noqa: E501
cys = (ks[iL:iU] - ref) * 1.5 + (y_max + h * 0.15)
plt.plot(cxs, cys, color='#228B22')
try:
cys_max = np.max(cys)
cys_min = np.min(cys)
y_boundary_max = max(y_boundary_max, cys_max + h*0.1)
except:
pass

return y_boundary_max


def __draw_peaks(self, plt, x_peaks, y_peaks, h, w):
if self.core.non_nmr == True or len(x_peaks) == 0:
return plt
ax = plt.gca()
differences = np.diff(x_peaks)
grouping_threshold = np.mean(differences)
groups_x = []
groups_y = []
current_group_x = [x_peaks[0]]
current_group_y = [y_peaks[0]]

for i in range(1, len(x_peaks)):
if (x_peaks[i] - current_group_x[-1] < grouping_threshold):
current_group_x.append(x_peaks[i])
current_group_y.append(y_peaks[i])
else:
groups_x.append(current_group_x)
current_group_x = [x_peaks[i]]
groups_y.append(current_group_y)
current_group_y = [y_peaks[i]]

groups_x.append(current_group_x)
groups_y.append(current_group_y)
max_values = [np.max(items) for items in groups_y]
x_boundary_min, x_boundary_max = np.min(x_peaks), np.max(x_peaks)

for i in range(len(groups_x)):
mygroup_x, mygroup_y = groups_x[i], groups_y[i]
len_group_x = len(mygroup_x)
if len_group_x > 1:
max_current_group = max_values[i] + h * 0.2
if (i > 0):
prev_max_group = max_values[i-1] + h * 0.2
my_gap = abs(max_current_group - prev_max_group)
if my_gap < h*0.1:
max_current_group = max_current_group + h * 0.45
middle_idx = int(len_group_x/2)
gap_value = np.mean(mygroup_x)
x_text = 0
for j in range(len_group_x):
x_pos = mygroup_x[j]
y_pos = mygroup_y[j] + h * 0.5
x_float = '{:.2f}'.format(x_pos)
peak_label = '{x}'.format(x=x_float)
if j >= middle_idx:
x_text = -(w * 0.01) * (middle_idx - j)
elif j < middle_idx:
x_text = w * 0.01 * (j-middle_idx)

ax.add_patch(FancyArrowPatch((x_pos, max_current_group), (x_pos, max_current_group + h * 0.05), linewidth=0.2))
ax.add_patch(FancyArrowPatch((x_pos, max_current_group + h * 0.05), (gap_value + x_text, max_current_group + h * 0.11), linewidth=0.2))

x_boundary_min = min(gap_value + x_text, x_boundary_min)
x_boundary_max = max(gap_value + x_text, x_boundary_max)

ax.annotate(peak_label,
xy=(gap_value + x_text, max_current_group + h * 0.11), xycoords='data',
xytext=(0, 12), textcoords='offset points',
arrowprops=dict(arrowstyle="-", linewidth=0.2),
rotation=90, size=10)
else:
x_pos = mygroup_x[0]
y_pos = mygroup_y[0] + h * 0.08
x_float = '{:.2f}'.format(x_pos)
peak_label = '{x}'.format(x=x_float)
ax.annotate(peak_label,
xy=(x_pos, y_pos), xycoords='data',
xytext=(0, 20), textcoords='offset points',
arrowprops=dict(arrowstyle="-", linewidth=0.2),
rotation=90, size=10)

x_boundary_min = x_boundary_min - w * 0.01
x_boundary_max = x_boundary_max + w * 0.02
if self.core.ncl == '13C':
x_boundary_min = min(x_boundary_min, -10.0)
x_boundary_max = max(x_boundary_max, 195.0)
elif self.core.ncl == '1H':
x_boundary_min = min(x_boundary_min, -0.2)
x_boundary_max = max(x_boundary_max, 9.0)
plt.xlim(
x_boundary_max,
x_boundary_min,
)
return plt

def __prepare_metadata_info_for_csv(self, csv_writer: csv.DictWriter):
csv_writer.writerow({
Expand All @@ -525,7 +629,6 @@ def __prepare_metadata_info_for_csv(self, csv_writer: csv.DictWriter):
})
csv_writer.writerow({
})


def tf_csv(self):
if self.core.is_cyclic_volta == False:
Expand Down

0 comments on commit db34088

Please sign in to comment.