Skip to content

API reference

LaserTRAM

The class LaserTRAM which is devoted to the "time resolved analysis" operations during the laser data reduction process. To be used in conjunction with the LaserCalc class. The general idea is that this creates an object that contains all the information related to one individual spot analysis.

Source code in lasertram\tram\tram.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
class LaserTRAM:
    """
    # LaserTRAM
    The class `LaserTRAM` which is devoted to the "time resolved analysis"
    operations during the laser data reduction process. To be used in
    conjunction with the `LaserCalc` class. The general idea is that
    this creates an object that contains all the information related
    to one individual spot analysis.

    """

    def __init__(self, name):
        """

        Args:
            name (str): your sample name i.e. the value in the `SampleLabel` column of the LT_ready file
        """
        # all attributes in relative chronological order that they are created in
        # if everything is done correctly. These all will get rewritten throughout the
        # data processing pipeline but this allows us to see what all the potential attributes
        # are going to be from the beginning (PEP convention)

        # for the math involved please see:

        # name of the lasertram spot object
        self.name = name

        # boolean flag for whether or not the data have been
        # despiked
        self.despiked = False

        # list of elements that have been despiked. Also may be 'all'
        self.despiked_elements = None

        # data from a single spot to be processed. 2D pandas dataframe
        self.data = None

        # self.data but as a 2D numpy matrix. Equivalent to self.data.values
        self.data_matrix = None

        # list of analytes in the analysis
        self.analytes = None

        # datetime corresponding to the analysis
        self.timestamp = None

        # string representation internal standard analyte for the processing.
        # this is just the column header of the analyte chosen as the internal
        # standard e.g., "29Si"
        self.int_std = None

        # column number in self.data_matrix that denotes the internal standard analyte
        # data. Remember python starts counting at 0!
        self.int_std_loc = None

        # background interval start time
        self.bkgd_start = None

        # background interval stop time
        self.bkgd_stop = None

        # desired ablation interval start time
        self.int_start = None

        # desired ablation interval stop time
        self.int_stop = None

        # row in self.data corresponding to self.bkgd_start
        self.bkgd_start_idx = None

        # row in self.data corresponding to self.bkgd_stop
        self.bkgd_stop_idx = None

        # row in self.data corresponding to self.int_start
        self.int_start_idx = None

        # row in self.data corresponding to self.int_stop
        self.int_stop_idx = None

        # desired omitted region start time
        self.omit_start = None

        # desired omitted region stop time
        self.omit_stop = None

        # row in self.data corresponding to self.omit_start
        self.omit_start_idx = None

        # row in self.data corresponding to self.omit_stop
        self.omit_stop_idx = None

        #
        self.omitted_region = None

        # 1D array of median background values [self.bkgd_start - self.bkgd_stop)
        # that is len(analytes) in shape
        self.bkgd_data_median = None

        # 1D array of detection limits in counts per second
        # that is len(analytes) in shape
        self.detection_limits = None

        # 2D array of background corrected data over the self.int_start - self.int_stop
        # region
        self.bkgd_subtract_data = None

        # 2D array of background corrected data over the self.int_start - self.int_stop
        # region that is normalized to the internal standard
        self.bkgd_subtract_normal_data = None

        # 1D array of median background corrected normalized values over the self.int_start - self.int_stop
        # retion that is len(analytes) in shape
        self.bkgd_subtract_med = None

        # 1D array of 1 standard error of the mean values for each analyte over the
        # self.int_start - self.int_stop region
        self.bkgd_subtract_std_err = None

        #
        self.bkgd_subtract_std_err_rel = None

        # 1D pandas dataframe that contains many of the attributes created during the
        # LaserTRAM process:
        # |timestamp|Spot|despiked|omitted_region|bkgd_start|bkgd_stop|int_start|int_stop|norm|norm_cps|analyte vals and uncertainties -->|
        # |---------|----|--------|--------------|----------|---------|---------|--------|----|--------|----------------------------------|
        self.output_report = None

    def get_data(self, df, time_units="ms"):
        """assigns raw counts/sec data to the object

        Args:
            df (pandas DataFrame): raw data corresponding to the spot being processed i.e., `all_data.loc[spot,:]` if `all_data` is the LT_ready file
            time_units (str): string denoting the units for the `Time` column. Used to convert input time values to seconds. Defaults to 'ms'.
        """
        # get data and set index to "SampleLabel" column
        self.data = df.reset_index()
        self.data = self.data.set_index("SampleLabel")

        # convert time units from ms --> s if applicable
        if time_units == "ms":
            self.data["Time"] = self.data["Time"] / 1000
        elif time_units == "s":
            pass

        # just numpy matrix for data
        self.data_matrix = self.data.iloc[:, 1:].to_numpy()

        # list of analytes in experiment
        self.analytes = self.data.loc[:, "Time":].columns.tolist()[1:]

        # need to add check for if this exists otherwise there is no timestamp attribute
        self.timestamp = str(self.data.loc[:, "timestamp"].unique()[0])

    def assign_int_std(self, int_std):
        """assigns the spot an internal standard
        analyte

        Args:
            int_std (str): the name of the column for the internal standard analyte e.g., "29Si"
        """

        # set the internal standard analyte
        self.int_std = int_std

        # get the internal standard array index
        self.int_std_loc = np.where(np.array(self.analytes) == self.int_std)[0][0]

    def assign_intervals(self, bkgd, keep, omit=None):
        """assigns the intervals to be used as background
        as well as the portion of the ablation interval to
        be used in calculating concentrations

        Args:
            bkgd (tuple): (start, stop) pair of values corresponding to the analysis time where the background signal starts and stops
            keep (tuple): (start, stop) pair of values correpsonding to the analysis time where the interval signal for concentrations starts and stops
            omit (tuple): (start, stop) pair of values corresponding to the analysis time to be omitted from the `keep` interval. Defaults to None.
        """

        # set background and interval times in s
        self.bkgd_start = bkgd[0]
        self.bkgd_stop = bkgd[1]
        self.int_start = keep[0]
        self.int_stop = keep[1]

        # equivalent background and interval times but as indices
        # in their respective arrays
        self.bkgd_start_idx = np.where(self.data["Time"] > self.bkgd_start)[0][0]
        self.bkgd_stop_idx = np.where(self.data["Time"] > self.bkgd_stop)[0][0]
        self.int_start_idx = np.where(self.data["Time"] > self.int_start)[0][0]
        self.int_stop_idx = np.where((self.data["Time"] > self.int_stop))[0][0]

        # boolean whether or not there is an omitted region
        self.omitted_region = False
        # if omission is true, set those start and stop times like above
        if omit:
            self.omit_start = omit[0]
            self.omit_stop = omit[1]
            self.omit_start_idx = (
                np.where(self.data["Time"] > self.omit_start)[0][0] - self.int_start_idx
            )
            self.omit_stop_idx = (
                np.where(self.data["Time"] > self.omit_stop)[0][0] - self.int_start_idx
            )

            self.omitted_region = True

    def get_bkgd_data(self):
        """
        uses the intervals assigned in `assign_intervals` to take the median
        value of all analytes within that range and use them as the
        background signal that gets subtracted from the ablation signal
        """
        # median background data values
        self.bkgd_data_median = np.median(
            self.data_matrix[self.bkgd_start_idx : self.bkgd_stop_idx, 1:], axis=0
        )

    def get_detection_limits(self):
        """
        Calculates detection limits in counts per second for each analyte. This
        is defined as the value that is three standard deviations away from the
        background.
        """

        self.detection_limits = np.std(
            self.data_matrix[self.bkgd_start_idx : self.bkgd_stop_idx, 1:], axis=0
        ) * 3 + np.median(
            self.data_matrix[self.bkgd_start_idx : self.bkgd_stop_idx, 1:], axis=0
        )

    def subtract_bkgd(self):
        """
        subtract the median background values calculated in `get_bkgd_data`
        from the signal in the "keep" interval established in `assign_intervals`

        """
        self.bkgd_subtract_data = (
            self.data_matrix[self.int_start_idx : self.int_stop_idx, 1:]
            - self.bkgd_data_median
        )

    def normalize_interval(self):
        """
        normalize the analytes from the "keep" portion of the signal
        the internal standard analyte. This is done by simply
        dividing the analytes by the internal standard analyte.

        This also calculates the median normalized value, its
        standard error of the mean, and relative standard error
        of the mean.
        """

        # set the detection limit thresholds to be checked against
        # with the interval data. This basically takes the detection limits
        threshold = self.detection_limits - self.bkgd_data_median

        # if there's an omitted region, remove it from the data to be further processed
        # for the chosen interval
        if self.omitted_region is True:
            self.bkgd_subtract_normal_data = np.delete(
                self.bkgd_subtract_data,
                np.arange(self.omit_start_idx, self.omit_stop_idx),
                axis=0,
            ) / np.delete(
                self.bkgd_subtract_data[:, self.int_std_loc][:, None],
                np.arange(self.omit_start_idx, self.omit_stop_idx),
                axis=0,
            )

        else:
            self.bkgd_subtract_normal_data = (
                self.bkgd_subtract_data
                / self.bkgd_subtract_data[:, self.int_std_loc][:, None]
            )

        # get background corrected and normalized median values for an interval
        self.bkgd_subtract_med = np.median(self.bkgd_subtract_normal_data, axis=0)
        self.bkgd_subtract_med[
            np.median(self.bkgd_subtract_data, axis=0) <= threshold
        ] = -9999
        self.bkgd_subtract_med[np.median(self.bkgd_subtract_data, axis=0) == 0] = -9999

        # standard error of the mean for the interval region
        self.bkgd_subtract_std_err = self.bkgd_subtract_normal_data.std(
            axis=0
        ) / np.sqrt(abs(self.int_stop_idx - self.int_start_idx))

        self.bkgd_subtract_std_err_rel = 100 * (
            self.bkgd_subtract_std_err / self.bkgd_subtract_med
        )

    def make_output_report(self):
        """
        create an output report for the spot processing. This is a
        pandas DataFrame that has the following format:

        |timestamp|Spot|despiked|omitted_region|bkgd_start|bkgd_stop|int_start|int_stop|norm|norm_cps|analyte vals and uncertainties -->|
        |---------|----|--------|--------------|----------|---------|---------|--------|----|--------|----------------------------------|
        """
        if self.despiked is True:
            despike_col = self.despiked_elements
        else:
            despike_col = "None"

        if self.omitted_region is True:
            omitted_col = (
                self.data["Time"].iloc[self.omit_start_idx + self.int_start_idx],
                self.data["Time"].iloc[self.omit_stop_idx + self.int_start_idx],
            )
        else:
            omitted_col = "None"

        spot_data = pd.DataFrame(
            [
                self.timestamp,
                self.name,
                despike_col,
                omitted_col,
                self.data["Time"].iloc[self.bkgd_start_idx],
                self.data["Time"].iloc[self.bkgd_stop_idx],
                self.data["Time"].iloc[self.int_start_idx],
                self.data["Time"].iloc[self.int_stop_idx],
                self.int_std,
                np.median(self.bkgd_subtract_data[:, self.int_std_loc]),
            ]
        ).T
        spot_data.columns = [
            "timestamp",
            "Spot",
            "despiked",
            "omitted_region",
            "bkgd_start",
            "bkgd_stop",
            "int_start",
            "int_stop",
            "norm",
            "norm_cps",
        ]
        spot_data = pd.concat(
            [
                spot_data,
                pd.DataFrame(
                    self.bkgd_subtract_med[np.newaxis, :], columns=self.analytes
                ),
                pd.DataFrame(
                    self.bkgd_subtract_std_err_rel[np.newaxis, :],
                    columns=[f"{analyte}_se" for analyte in self.analytes],
                ),
            ],
            axis="columns",
        )

        for col in ["bkgd_start", "bkgd_stop", "int_start", "int_stop", "norm_cps"]:
            spot_data[col] = spot_data[col].astype(np.float64)

        self.output_report = spot_data

    def despike_data(self, analyte_list="all", std_devs=4, window=25):
        """
        despike counts per second normalized to an internal standard using a z score filter

        Parameters
        ----------
        analyte_list : str, optional
            list of analytes to despike. Accepts singular analytes e.g., "29Si"
            or numerous e.g., ["7Li", "29Si"]. by default "all"
        std_devs : int, optional
            number of standard deviations from the mean to be considered an outlier, by default 3
        window : int, optional
            size of the window to be used in the moving average, by default 50
        """

        assert (
            self.bkgd_subtract_normal_data is not None
        ), "please normalize your data prior to despiking"

        self.despiked = True

        if analyte_list == "all":
            filter_list = self.analytes
        else:
            if isinstance(analyte_list, list):
                pass
            else:
                analyte_list = [analyte_list]

            filter_list = analyte_list

        self.despiked_elements = filter_list

        df = pd.DataFrame(self.bkgd_subtract_normal_data, columns=self.analytes)

        for analyte in filter_list:

            filtered = _z_filter(df[analyte], window=window, std_devs=std_devs)

            # replaces data with despiked data
            df[analyte] = filtered

        self.bkgd_subtract_normal_data = df.loc[:, self.analytes].values

        # now recalculate uncertainties after despiking
        # standard error of the mean for the interval region
        self.bkgd_subtract_std_err = self.bkgd_subtract_normal_data.std(
            axis=0
        ) / np.sqrt(abs(self.int_stop_idx - self.int_start_idx))

        self.bkgd_subtract_std_err_rel = 100 * (
            self.bkgd_subtract_std_err / self.bkgd_subtract_med
        )

__init__(name)

Parameters:

Name Type Description Default
name str

your sample name i.e. the value in the SampleLabel column of the LT_ready file

required
Source code in lasertram\tram\tram.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def __init__(self, name):
    """

    Args:
        name (str): your sample name i.e. the value in the `SampleLabel` column of the LT_ready file
    """
    # all attributes in relative chronological order that they are created in
    # if everything is done correctly. These all will get rewritten throughout the
    # data processing pipeline but this allows us to see what all the potential attributes
    # are going to be from the beginning (PEP convention)

    # for the math involved please see:

    # name of the lasertram spot object
    self.name = name

    # boolean flag for whether or not the data have been
    # despiked
    self.despiked = False

    # list of elements that have been despiked. Also may be 'all'
    self.despiked_elements = None

    # data from a single spot to be processed. 2D pandas dataframe
    self.data = None

    # self.data but as a 2D numpy matrix. Equivalent to self.data.values
    self.data_matrix = None

    # list of analytes in the analysis
    self.analytes = None

    # datetime corresponding to the analysis
    self.timestamp = None

    # string representation internal standard analyte for the processing.
    # this is just the column header of the analyte chosen as the internal
    # standard e.g., "29Si"
    self.int_std = None

    # column number in self.data_matrix that denotes the internal standard analyte
    # data. Remember python starts counting at 0!
    self.int_std_loc = None

    # background interval start time
    self.bkgd_start = None

    # background interval stop time
    self.bkgd_stop = None

    # desired ablation interval start time
    self.int_start = None

    # desired ablation interval stop time
    self.int_stop = None

    # row in self.data corresponding to self.bkgd_start
    self.bkgd_start_idx = None

    # row in self.data corresponding to self.bkgd_stop
    self.bkgd_stop_idx = None

    # row in self.data corresponding to self.int_start
    self.int_start_idx = None

    # row in self.data corresponding to self.int_stop
    self.int_stop_idx = None

    # desired omitted region start time
    self.omit_start = None

    # desired omitted region stop time
    self.omit_stop = None

    # row in self.data corresponding to self.omit_start
    self.omit_start_idx = None

    # row in self.data corresponding to self.omit_stop
    self.omit_stop_idx = None

    #
    self.omitted_region = None

    # 1D array of median background values [self.bkgd_start - self.bkgd_stop)
    # that is len(analytes) in shape
    self.bkgd_data_median = None

    # 1D array of detection limits in counts per second
    # that is len(analytes) in shape
    self.detection_limits = None

    # 2D array of background corrected data over the self.int_start - self.int_stop
    # region
    self.bkgd_subtract_data = None

    # 2D array of background corrected data over the self.int_start - self.int_stop
    # region that is normalized to the internal standard
    self.bkgd_subtract_normal_data = None

    # 1D array of median background corrected normalized values over the self.int_start - self.int_stop
    # retion that is len(analytes) in shape
    self.bkgd_subtract_med = None

    # 1D array of 1 standard error of the mean values for each analyte over the
    # self.int_start - self.int_stop region
    self.bkgd_subtract_std_err = None

    #
    self.bkgd_subtract_std_err_rel = None

    # 1D pandas dataframe that contains many of the attributes created during the
    # LaserTRAM process:
    # |timestamp|Spot|despiked|omitted_region|bkgd_start|bkgd_stop|int_start|int_stop|norm|norm_cps|analyte vals and uncertainties -->|
    # |---------|----|--------|--------------|----------|---------|---------|--------|----|--------|----------------------------------|
    self.output_report = None

assign_int_std(int_std)

assigns the spot an internal standard analyte

Parameters:

Name Type Description Default
int_std str

the name of the column for the internal standard analyte e.g., "29Si"

required
Source code in lasertram\tram\tram.py
204
205
206
207
208
209
210
211
212
213
214
215
216
def assign_int_std(self, int_std):
    """assigns the spot an internal standard
    analyte

    Args:
        int_std (str): the name of the column for the internal standard analyte e.g., "29Si"
    """

    # set the internal standard analyte
    self.int_std = int_std

    # get the internal standard array index
    self.int_std_loc = np.where(np.array(self.analytes) == self.int_std)[0][0]

assign_intervals(bkgd, keep, omit=None)

assigns the intervals to be used as background as well as the portion of the ablation interval to be used in calculating concentrations

Parameters:

Name Type Description Default
bkgd tuple

(start, stop) pair of values corresponding to the analysis time where the background signal starts and stops

required
keep tuple

(start, stop) pair of values correpsonding to the analysis time where the interval signal for concentrations starts and stops

required
omit tuple

(start, stop) pair of values corresponding to the analysis time to be omitted from the keep interval. Defaults to None.

None
Source code in lasertram\tram\tram.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def assign_intervals(self, bkgd, keep, omit=None):
    """assigns the intervals to be used as background
    as well as the portion of the ablation interval to
    be used in calculating concentrations

    Args:
        bkgd (tuple): (start, stop) pair of values corresponding to the analysis time where the background signal starts and stops
        keep (tuple): (start, stop) pair of values correpsonding to the analysis time where the interval signal for concentrations starts and stops
        omit (tuple): (start, stop) pair of values corresponding to the analysis time to be omitted from the `keep` interval. Defaults to None.
    """

    # set background and interval times in s
    self.bkgd_start = bkgd[0]
    self.bkgd_stop = bkgd[1]
    self.int_start = keep[0]
    self.int_stop = keep[1]

    # equivalent background and interval times but as indices
    # in their respective arrays
    self.bkgd_start_idx = np.where(self.data["Time"] > self.bkgd_start)[0][0]
    self.bkgd_stop_idx = np.where(self.data["Time"] > self.bkgd_stop)[0][0]
    self.int_start_idx = np.where(self.data["Time"] > self.int_start)[0][0]
    self.int_stop_idx = np.where((self.data["Time"] > self.int_stop))[0][0]

    # boolean whether or not there is an omitted region
    self.omitted_region = False
    # if omission is true, set those start and stop times like above
    if omit:
        self.omit_start = omit[0]
        self.omit_stop = omit[1]
        self.omit_start_idx = (
            np.where(self.data["Time"] > self.omit_start)[0][0] - self.int_start_idx
        )
        self.omit_stop_idx = (
            np.where(self.data["Time"] > self.omit_stop)[0][0] - self.int_start_idx
        )

        self.omitted_region = True

despike_data(analyte_list='all', std_devs=4, window=25)

despike counts per second normalized to an internal standard using a z score filter

Parameters

analyte_list : str, optional list of analytes to despike. Accepts singular analytes e.g., "29Si" or numerous e.g., ["7Li", "29Si"]. by default "all" std_devs : int, optional number of standard deviations from the mean to be considered an outlier, by default 3 window : int, optional size of the window to be used in the moving average, by default 50

Source code in lasertram\tram\tram.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def despike_data(self, analyte_list="all", std_devs=4, window=25):
    """
    despike counts per second normalized to an internal standard using a z score filter

    Parameters
    ----------
    analyte_list : str, optional
        list of analytes to despike. Accepts singular analytes e.g., "29Si"
        or numerous e.g., ["7Li", "29Si"]. by default "all"
    std_devs : int, optional
        number of standard deviations from the mean to be considered an outlier, by default 3
    window : int, optional
        size of the window to be used in the moving average, by default 50
    """

    assert (
        self.bkgd_subtract_normal_data is not None
    ), "please normalize your data prior to despiking"

    self.despiked = True

    if analyte_list == "all":
        filter_list = self.analytes
    else:
        if isinstance(analyte_list, list):
            pass
        else:
            analyte_list = [analyte_list]

        filter_list = analyte_list

    self.despiked_elements = filter_list

    df = pd.DataFrame(self.bkgd_subtract_normal_data, columns=self.analytes)

    for analyte in filter_list:

        filtered = _z_filter(df[analyte], window=window, std_devs=std_devs)

        # replaces data with despiked data
        df[analyte] = filtered

    self.bkgd_subtract_normal_data = df.loc[:, self.analytes].values

    # now recalculate uncertainties after despiking
    # standard error of the mean for the interval region
    self.bkgd_subtract_std_err = self.bkgd_subtract_normal_data.std(
        axis=0
    ) / np.sqrt(abs(self.int_stop_idx - self.int_start_idx))

    self.bkgd_subtract_std_err_rel = 100 * (
        self.bkgd_subtract_std_err / self.bkgd_subtract_med
    )

get_bkgd_data()

uses the intervals assigned in assign_intervals to take the median value of all analytes within that range and use them as the background signal that gets subtracted from the ablation signal

Source code in lasertram\tram\tram.py
257
258
259
260
261
262
263
264
265
266
def get_bkgd_data(self):
    """
    uses the intervals assigned in `assign_intervals` to take the median
    value of all analytes within that range and use them as the
    background signal that gets subtracted from the ablation signal
    """
    # median background data values
    self.bkgd_data_median = np.median(
        self.data_matrix[self.bkgd_start_idx : self.bkgd_stop_idx, 1:], axis=0
    )

get_data(df, time_units='ms')

assigns raw counts/sec data to the object

Parameters:

Name Type Description Default
df pandas DataFrame

raw data corresponding to the spot being processed i.e., all_data.loc[spot,:] if all_data is the LT_ready file

required
time_units str

string denoting the units for the Time column. Used to convert input time values to seconds. Defaults to 'ms'.

'ms'
Source code in lasertram\tram\tram.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def get_data(self, df, time_units="ms"):
    """assigns raw counts/sec data to the object

    Args:
        df (pandas DataFrame): raw data corresponding to the spot being processed i.e., `all_data.loc[spot,:]` if `all_data` is the LT_ready file
        time_units (str): string denoting the units for the `Time` column. Used to convert input time values to seconds. Defaults to 'ms'.
    """
    # get data and set index to "SampleLabel" column
    self.data = df.reset_index()
    self.data = self.data.set_index("SampleLabel")

    # convert time units from ms --> s if applicable
    if time_units == "ms":
        self.data["Time"] = self.data["Time"] / 1000
    elif time_units == "s":
        pass

    # just numpy matrix for data
    self.data_matrix = self.data.iloc[:, 1:].to_numpy()

    # list of analytes in experiment
    self.analytes = self.data.loc[:, "Time":].columns.tolist()[1:]

    # need to add check for if this exists otherwise there is no timestamp attribute
    self.timestamp = str(self.data.loc[:, "timestamp"].unique()[0])

get_detection_limits()

Calculates detection limits in counts per second for each analyte. This is defined as the value that is three standard deviations away from the background.

Source code in lasertram\tram\tram.py
268
269
270
271
272
273
274
275
276
277
278
279
def get_detection_limits(self):
    """
    Calculates detection limits in counts per second for each analyte. This
    is defined as the value that is three standard deviations away from the
    background.
    """

    self.detection_limits = np.std(
        self.data_matrix[self.bkgd_start_idx : self.bkgd_stop_idx, 1:], axis=0
    ) * 3 + np.median(
        self.data_matrix[self.bkgd_start_idx : self.bkgd_stop_idx, 1:], axis=0
    )

make_output_report()

create an output report for the spot processing. This is a pandas DataFrame that has the following format:

timestamp Spot despiked omitted_region bkgd_start bkgd_stop int_start int_stop norm norm_cps analyte vals and uncertainties -->
Source code in lasertram\tram\tram.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def make_output_report(self):
    """
    create an output report for the spot processing. This is a
    pandas DataFrame that has the following format:

    |timestamp|Spot|despiked|omitted_region|bkgd_start|bkgd_stop|int_start|int_stop|norm|norm_cps|analyte vals and uncertainties -->|
    |---------|----|--------|--------------|----------|---------|---------|--------|----|--------|----------------------------------|
    """
    if self.despiked is True:
        despike_col = self.despiked_elements
    else:
        despike_col = "None"

    if self.omitted_region is True:
        omitted_col = (
            self.data["Time"].iloc[self.omit_start_idx + self.int_start_idx],
            self.data["Time"].iloc[self.omit_stop_idx + self.int_start_idx],
        )
    else:
        omitted_col = "None"

    spot_data = pd.DataFrame(
        [
            self.timestamp,
            self.name,
            despike_col,
            omitted_col,
            self.data["Time"].iloc[self.bkgd_start_idx],
            self.data["Time"].iloc[self.bkgd_stop_idx],
            self.data["Time"].iloc[self.int_start_idx],
            self.data["Time"].iloc[self.int_stop_idx],
            self.int_std,
            np.median(self.bkgd_subtract_data[:, self.int_std_loc]),
        ]
    ).T
    spot_data.columns = [
        "timestamp",
        "Spot",
        "despiked",
        "omitted_region",
        "bkgd_start",
        "bkgd_stop",
        "int_start",
        "int_stop",
        "norm",
        "norm_cps",
    ]
    spot_data = pd.concat(
        [
            spot_data,
            pd.DataFrame(
                self.bkgd_subtract_med[np.newaxis, :], columns=self.analytes
            ),
            pd.DataFrame(
                self.bkgd_subtract_std_err_rel[np.newaxis, :],
                columns=[f"{analyte}_se" for analyte in self.analytes],
            ),
        ],
        axis="columns",
    )

    for col in ["bkgd_start", "bkgd_stop", "int_start", "int_stop", "norm_cps"]:
        spot_data[col] = spot_data[col].astype(np.float64)

    self.output_report = spot_data

normalize_interval()

normalize the analytes from the "keep" portion of the signal the internal standard analyte. This is done by simply dividing the analytes by the internal standard analyte.

This also calculates the median normalized value, its standard error of the mean, and relative standard error of the mean.

Source code in lasertram\tram\tram.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def normalize_interval(self):
    """
    normalize the analytes from the "keep" portion of the signal
    the internal standard analyte. This is done by simply
    dividing the analytes by the internal standard analyte.

    This also calculates the median normalized value, its
    standard error of the mean, and relative standard error
    of the mean.
    """

    # set the detection limit thresholds to be checked against
    # with the interval data. This basically takes the detection limits
    threshold = self.detection_limits - self.bkgd_data_median

    # if there's an omitted region, remove it from the data to be further processed
    # for the chosen interval
    if self.omitted_region is True:
        self.bkgd_subtract_normal_data = np.delete(
            self.bkgd_subtract_data,
            np.arange(self.omit_start_idx, self.omit_stop_idx),
            axis=0,
        ) / np.delete(
            self.bkgd_subtract_data[:, self.int_std_loc][:, None],
            np.arange(self.omit_start_idx, self.omit_stop_idx),
            axis=0,
        )

    else:
        self.bkgd_subtract_normal_data = (
            self.bkgd_subtract_data
            / self.bkgd_subtract_data[:, self.int_std_loc][:, None]
        )

    # get background corrected and normalized median values for an interval
    self.bkgd_subtract_med = np.median(self.bkgd_subtract_normal_data, axis=0)
    self.bkgd_subtract_med[
        np.median(self.bkgd_subtract_data, axis=0) <= threshold
    ] = -9999
    self.bkgd_subtract_med[np.median(self.bkgd_subtract_data, axis=0) == 0] = -9999

    # standard error of the mean for the interval region
    self.bkgd_subtract_std_err = self.bkgd_subtract_normal_data.std(
        axis=0
    ) / np.sqrt(abs(self.int_stop_idx - self.int_start_idx))

    self.bkgd_subtract_std_err_rel = 100 * (
        self.bkgd_subtract_std_err / self.bkgd_subtract_med
    )

subtract_bkgd()

subtract the median background values calculated in get_bkgd_data from the signal in the "keep" interval established in assign_intervals

Source code in lasertram\tram\tram.py
281
282
283
284
285
286
287
288
289
290
def subtract_bkgd(self):
    """
    subtract the median background values calculated in `get_bkgd_data`
    from the signal in the "keep" interval established in `assign_intervals`

    """
    self.bkgd_subtract_data = (
        self.data_matrix[self.int_start_idx : self.int_stop_idx, 1:]
        - self.bkgd_data_median
    )

LaserCalc

The class LaserCalc which is devoted to calculating concentrations for laser ablation ICP-MS spot or line of spots data following the methodology of Longerich et al., (1996) and Kent and Ungerer (2006). It should be used in conjunction with the output from LaserTRAM class. The basic steps are as follows:

  1. upload SRM data
  2. upload LaserTRAM output
  3. set the calibration standard
  4. set the internal standard concentrations for the unknowns
  5. calculate the concentrations and uncertainties of all analyses

References

  • Longerich, H. P., Jackson, S. E., & Günther, D. (1996). Inter-laboratory note. Laser ablation inductively coupled plasma mass spectrometric transient signal data acquisition and analyte concentration calculation. Journal of analytical atomic spectrometry, 11(9), 899-904.
  • Kent, A. J., & Ungerer, C. A. (2006). Analysis of light lithophile elements (Li, Be, B) by laser ablation ICP-MS: comparison between magnetic sector and quadrupole ICP-MS. American Mineralogist, 91(8-9), 1401-1411.
Source code in lasertram\calc\calc.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
class LaserCalc:
    """
    # LaserCalc

    The class `LaserCalc` which is devoted to calculating
    concentrations for laser ablation ICP-MS spot or
    line of spots data following the methodology of
    Longerich et al., (1996) and Kent and Ungerer (2006). It should be used in conjunction
    with the output from `LaserTRAM` class. The basic steps are as follows:

    1. upload SRM data
    2. upload `LaserTRAM` output
    3. set the calibration standard
    4. set the internal standard concentrations for the unknowns
    5. calculate the concentrations and uncertainties of all analyses

    References


    - Longerich, H. P., Jackson, S. E., & Günther, D. (1996). Inter-laboratory note.
            Laser ablation inductively coupled plasma mass spectrometric transient signal
            data acquisition and analyte concentration calculation. Journal of analytical
            atomic spectrometry, 11(9), 899-904.
    - Kent, A. J., & Ungerer, C. A. (2006). Analysis of light lithophile elements
            (Li, Be, B) by laser ablation ICP-MS: comparison between magnetic sector and
            quadrupole ICP-MS. American Mineralogist, 91(8-9), 1401-1411.


    """

    def __init__(self, name):
        """


        Args:
            name (str): The name of the experiment to be processed
        """
        # all attributes in relative chronological order that they are created in
        # if everything is done correctly. These all will get rewritten throughout the
        # data processing pipeline but this allows us to see what all the potential attributes
        # are going to be from the beginning (PEP convention)

        # for the math involved please see:

        # name for the lasercalc object
        # for notekeeping
        self.name = name

        # 2D pandas dataframe of standards reference material preferred compositions
        # from georem
        self.standards_data = None

        # List of standard reference materials in self.standards_data
        self.database_standards = None

        # list of standard reference material elements/oxides in self.standards_data
        self.standard_elements = None

        # list of standard reference material element/oxide 1 sigma uncertainties in self.standards_data
        self.standard_element_uncertainties = None

        # list of spot analyses for which concentrations are being calculated
        # this is the equivalent of self.data['Spot']
        self.spots = None

        # list of analytes for which concentrations are being calculated
        # these are column headers in self.data
        self.analytes = None

        # 1 sigma standard deviation of the calibration standard values
        # in self.data. Is len(analytes) in shape
        self.calibration_std_stdevs = None

        # 2D pandas dataframe that represents the metadata and data for numerous
        # spot analyses. Each row is the equivalent of a LaserTRAM.output_report
        # and has the following columns:
        # |timestamp|Spot|despiked|omitted_region|bkgd_start|bkgd_stop|int_start|int_stop|norm|norm_cps|analyte vals and uncertainties -->|
        # |---------|----|--------|--------------|----------|---------|---------|--------|----|--------|----------------------------------|
        self.data = None

        # element used as internal standard. NOT to be confused with analyte
        # e.g. self.int_std_element == 'Si' NOT '29Si'
        self.int_std_element = None

        # list of standard reference materials in found in self.data that are
        # also found in self.database_standards. This lets you know which standard reference
        # materials you can use as potential calibration standards
        self.potential_calibration_standards = None

        # list of samples in self.data with the self.potential_calibration_standards
        # removed
        self.samples_nostandards = None

        # list of elements for which concentrations are being calculated
        # this is the equivalent to self.analytes with the atomic masses
        # removed
        self.elements = None

        # string representing the standard reference material used
        # as the calibration standard for calculating concentrations
        self.calibration_std = None

        # 2D pandas dataframe which is a subset of self.data for only the
        # calibration standard data. This is essentially self.data.loc[self.calibration_std,:]
        self.calibration_std_data = None

        # mean calibration standard values for all analytes
        # equivalent of self.calibration_std_data.mean(axis = 0)
        self.calibration_std_means = None

        # calibration standard standard error of the mean for all analytes
        self.calibration_std_ses = None

        # 2D dataframe that is contains statistics for each analyte in self.calibration_std_data
        # columns are:
        # drift_correct | f_pval | f_value | f_crit_value | rmse | slope | intercept | mean | std_dev | percent_std_err
        # These stats are based on the following regression:
        # for each analyte
        # x = self.calibration_std_data.loc[:,'timestamp']
        # y = self.calibration_std_data.loc[:, analyte]

        # X = sm.add_constant(x)
        # Note the difference in argument order
        # model = sm.OLS(y, X).fit()
        # now generate predictions
        # ypred = model.predict(X)

        # calc rmse
        # RMSE = rmse(y, ypred)

        self.calibration_std_stats = None

        # the ratio of concentrations between an analyte and the internal standard
        # in the georem calibration standard values
        self.calibration_std_conc_ratios = None

        # list of standard reference materials that are not used as calibration standard
        # this is effectively self.potential_calibration_standards with self.calibration_std
        # removed
        self.secondary_standards = None

        # 2D pandas dataframe of calculated concentrations for all spots in self.secondary_standards and all
        # analytes in self.analytes. This is self.data.loc[self.secondary_standards,self.analytes].shape in shape
        self.SRM_concentrations = None

        # 2D pandas dataframe of calculated concentrations for all spots in self.spots and all
        # analytes in self.analytes. This is self.data.loc[self.spots,self.analytes].shape in shape
        self.unknown_concentrations = None

        # 2D pandas dataframe of calculated accuracies for all spots in self.secondary_standards and all
        # analytes in self.analytes. This is self.data.loc[self.secondary_standards,self.analytes].shape in shape
        # here accuracy is just 100*measured_concentration / georem_concentration
        self.SRM_accuracies = None

    def get_SRM_comps(self, df):
        """load in a database of standard reference material compositions

        Args:
            df (pandas DataFrame): pandas DataFrame of standard reference materials
        where each row represents data for a standard reference material.
        The first column should be named "Standard". All other columns are
        for different elemental concentrations.Standard names must be exact
        names found in GEOREM: http://georem.mpch-mainz.gwdg.de/sample_query_pref.asp
        """

        self.standards_data = df.set_index("Standard")
        self.database_standards = self.standards_data.index.unique().to_list()
        # Get a list of all of the elements supported in the published standard datasheet
        # Get a second list for the same elements but their corresponding uncertainty columns
        self.standard_elements = [
            analyte
            for analyte in self.standards_data.columns.tolist()
            if "_std" not in analyte
        ]
        self.standard_element_uncertainties = [
            analyte + "_std" for analyte in self.standard_elements
        ]

    def get_data(self, df):
        """load in output from `LaserTRAM` for calculation of concentrations

        Args:
            df (pandas DataFrame): a 2D pandas DataFrame representing numerous concatenated calls to `LaserTRAM.make_output_report()`

        """
        # check if first row is nan (output from GUI does this).
        # If so, remove it
        df = df[df.iloc[:, 0].isna() == False]

        data = df.set_index("Spot")
        data.insert(loc=1, column="index", value=np.arange(1, len(data) + 1))

        self.spots = data.index.unique().dropna().tolist()

        # Check for potential calibration standards. This will let us know what our options
        # are for choosing calibration standards by looking for spots that have the same string
        # as the standard spreadsheet

        stds_column = [
            [std for std in self.database_standards if std in spot]
            for spot in self.spots
        ]

        stds_column = [["unknown"] if not l else l for l in stds_column]

        stds_column = [std for sublist in stds_column for std in sublist]

        # standards that can be used as calibrations standards (must have more than 1 analysis)
        # potential_standards = list(np.unique(stds_column))
        potential_standards = [
            std for std in np.unique(stds_column) if stds_column.count(std) > 1
        ]
        potential_standards.remove("unknown")

        # all of the samples in your input sheet that are NOT potential standards
        all_standards = list(np.unique(stds_column))
        all_standards.remove("unknown")

        data["sample"] = stds_column

        data.reset_index(inplace=True)
        data.set_index("sample", inplace=True)

        self.data = data
        self.potential_calibration_standards = potential_standards
        self.samples_nostandards = list(np.setdiff1d(stds_column, all_standards))

        self.analytes = [
            analyte
            for analyte in data.columns.tolist()
            if not (
                "_se" in analyte
                or "norm" in analyte
                or "index" in analyte
                or "Spot" in analyte
                or "wt%" in analyte
                or "1stdev%" in analyte
                or "start" in analyte
                or "stop" in analyte
                or "long" in analyte
                or "timestamp" in analyte
                or "despiked" in analyte
                or "omitted_region" in analyte
            )
        ]
        # elements without isotopes in the front
        self.elements = [re.split(r"(\d+)", analyte)[2] for analyte in self.analytes]

        # internal standard analyte from lasertram
        self.int_std_element = re.split(r"(\d+)", self.data["norm"].unique()[0])[2]

    def set_calibration_standard(self, std):
        """Assign which standard reference material will be the calibration
        standard for calculating concentrations.

        Args:
            std (str): name of standard reference material (e.g., `NIST-612`,`BCR-2G`)
        """
        self.calibration_std = std

        self.calibration_std_data = self.data.loc[std, :]
        # Calibration standard information
        # mean
        self.calibration_std_means = self.calibration_std_data.loc[
            :, self.analytes
        ].mean()
        # std deviation
        self.calibration_std_stdevs = self.calibration_std_data.loc[
            :, self.analytes
        ].std()
        # relative standard error
        self.calibration_std_ses = 100 * (
            (self.calibration_std_stdevs / self.calibration_std_means)
            / np.sqrt(self.calibration_std_data.shape[0])
        )

    def drift_check(self, pval=0.01):
        """For each analyte in the experiment, perform a linear regression to
        assess whether or not drift in the mass spectrometer is happening at a
        significant level. Significance is determined by setting the `pval` threshold.
        If the regression is statistically significant, it gets flagged for later
        correct treatment in `calculate_concentrations`



        Parameters
        ----------
        pval : float, optional
            significance threshold to reject the null hypothesis for drift correction, by default 0.01
        """
        calib_std_rmses = []
        calib_std_slopes = []
        calib_std_intercepts = []
        drift_check = []

        f_pvals = []
        f_vals = []
        f_crits = []
        for analyte in self.analytes:
            # Getting regression statistics on analyte normalized ratios through time
            # for the calibration standard. This is what we use to check to see if it needs
            # to be drift corrected
            if "timestamp" in self.calibration_std_data.columns.tolist():
                # get an array in time units based on timestamp column. This is
                # is in seconds
                x = np.array(
                    [
                        np.datetime64(d, "m")
                        for d in self.calibration_std_data["timestamp"]
                    ]
                ).astype(np.float64)
                # x = np.cumsum(np.diff(x))
                # x = np.insert(x, 0, 0).astype(np.float64)

            else:
                x = self.calibration_std_data["index"].to_numpy()

            y = self.calibration_std_data.loc[:, analyte].astype("float64")

            X = sm.add_constant(x)
            # Note the difference in argument order
            model = sm.OLS(y, X).fit()
            # now generate predictions
            ypred = model.predict(X)

            # calc rmse
            RMSE = rmse(y, ypred)

            calib_std_rmses.append(RMSE)

            if model.params.shape[0] < 2:
                calib_std_slopes.append(model.params.loc["x1"])
                calib_std_intercepts.append(0)

            else:
                calib_std_slopes.append(model.params.loc["x1"])
                calib_std_intercepts.append(model.params.loc["const"])

            # new stuff
            # confidence limit 99%

            # f value stuff

            fvalue = model.fvalue
            f_vals.append(fvalue)
            f_pvalue = model.f_pvalue
            f_pvals.append(f_pvalue)
            fcrit = stats.f.ppf(q=1 - pval, dfn=len(x) - 1, dfd=len(y) - 1)
            f_crits.append(fcrit)
            if (f_pvalue < pval) and (fvalue > fcrit):
                drift = "True"
                drift_check.append(drift)
            else:
                drift = "False"
                drift_check.append(drift)

        self.calibration_std_stats = pd.DataFrame(
            {
                "drift_correct": drift_check,
                "f_pval": f_pvals,
                "f_value": f_vals,
                "f_crit_value": f_crits,
                "rmse": calib_std_rmses,
                "slope": calib_std_slopes,
                "intercept": calib_std_intercepts,
                "mean": self.calibration_std_means[self.analytes].to_numpy(),
                "std_dev": self.calibration_std_stdevs[self.analytes].to_numpy(),
                "percent_std_err": self.calibration_std_ses[self.analytes].to_numpy(),
            },
            index=self.analytes,
        )

    def get_calibration_std_ratios(self):
        """
        For the calibration standard, calculate the concentration ratio between every analyte and the internal standard.
        """

        # For our calibration standard, calculate the concentration ratio
        # of each analyte to the element used as the internal standard
        std_conc_ratios = []

        for element in self.elements:
            if element in self.standard_elements:
                std_conc_ratios.append(
                    self.standards_data.loc[self.calibration_std, element]
                    / self.standards_data.loc[
                        self.calibration_std, self.int_std_element
                    ]
                )

        # make our list an array for easier math going forward
        self.calibration_std_conc_ratios = np.array(std_conc_ratios)

    def set_int_std_concentrations(
        self,
        spots=None,
        concentrations=None,
        uncertainties=None,
    ):
        """Assign the concentration and uncertainty of the internal standard analyte to
        a series of spots.

        Briefly...a linear change in the concentration value reflects a linear change
        in the calculated concentration.

        Args:
            spots (pandas Series): pandas series containing the names of the spots tohave their internal standard concentration-uncertainty assigned. This is the `Spot` column from the output of `LaserTRAM`.

            concentrations (array-like): values representing the internal standard concentration. Must be the same shape as `spots`.
            uncertainties (array-like): values representing the internal standard relative uncertainty in percent. Must be the same shape as `spots`.
        """
        if spots is None:
            spots = (self.data["Spot"],)
            concentrations = (np.full(self.data["Spot"].shape[0], 10),)
            uncertainties = (np.full(self.data["Spot"].shape[0], 1),)

        self.data["int_std_comp"] = 10.0
        self.data["int_std_rel_unc"] = 1.0
        df = self.data.reset_index().set_index("Spot")

        for spot, concentration, uncertainty in zip(
            spots, concentrations, uncertainties
        ):
            df.loc[spot, "int_std_comp"] = concentration
            df.loc[spot, "int_std_rel_unc"] = uncertainty

        self.data["int_std_comp"] = df["int_std_comp"].to_numpy()
        self.data["int_std_rel_unc"] = df["int_std_rel_unc"].to_numpy()

    def calculate_concentrations(self):
        """
        Calculates the concentration and uncertainty of all spots in the experiment
        using the user specified calibration standard and internal standard
        concentrations/uncertainties.

        """

        secondary_standards = self.potential_calibration_standards.copy()
        secondary_standards.remove(self.calibration_std)
        self.secondary_standards = secondary_standards
        secondary_standards_concentrations_list = []
        unknown_concentrations_list = []

        for sample in secondary_standards:
            Cn_u = self.standards_data.loc[
                sample,
                re.split(
                    r"(\d+)",
                    self.calibration_std_data["norm"].unique()[0],
                )[2],
            ]
            Cin_std = self.calibration_std_conc_ratios
            Ni_std = self.calibration_std_stats["mean"][self.analytes]
            Ni_u = self.data.loc[sample, self.analytes]

            concentrations = Cn_u * (Cin_std / Ni_std) * Ni_u

            drift_concentrations_list = []

            for j, analyte, slope, intercept, drift in zip(
                range(len(self.analytes)),
                self.analytes,
                self.calibration_std_stats["slope"],
                self.calibration_std_stats["intercept"],
                self.calibration_std_stats["drift_correct"],
            ):
                if "True" in drift:
                    if "timestamp" in self.data.columns.tolist():
                        frac = (
                            slope
                            * np.array(
                                [
                                    np.datetime64(d, "m")
                                    for d in self.data.loc[sample, "timestamp"]
                                ]
                            ).astype(np.float64)
                            + intercept
                        )
                    else:
                        frac = slope * self.data.loc[sample, "index"] + intercept

                    Ni_std = frac

                    drift_concentrations = Cn_u * (Cin_std[j] / Ni_std) * Ni_u[analyte]

                    if isinstance(drift_concentrations, np.float64):
                        df = pd.DataFrame(
                            np.array([drift_concentrations]), columns=[analyte]
                        )

                    else:
                        df = pd.DataFrame(drift_concentrations, columns=[analyte])

                    drift_concentrations_list.append(df)

            if len(drift_concentrations_list) > 0:
                drift_df = pd.concat(drift_concentrations_list, axis="columns")

                if drift_df.shape[0] == 1:
                    drift_df["sample"] = sample
                    drift_df.set_index("sample", inplace=True)
            else:
                drift_df = pd.DataFrame()

            for column in drift_df.columns.tolist():
                if isinstance(concentrations, pd.Series):
                    concentrations.loc[column] = drift_df[column].to_numpy()[0]

                else:
                    concentrations[column] = drift_df[column].to_numpy()

            if isinstance(concentrations, pd.Series):
                concentrations = pd.DataFrame(concentrations).T
                concentrations["sample"] = sample
                concentrations.set_index("sample", inplace=True)

            secondary_standards_concentrations_list.append(concentrations)

        ###############################
        for sample in self.samples_nostandards:
            Cn_u = conversions.oxide_to_ppm(
                self.data.loc[sample, "int_std_comp"],
                self.data.loc[sample, "norm"].unique()[0],
            ).to_numpy()
            Cin_std = self.calibration_std_conc_ratios
            Ni_std = self.calibration_std_stats["mean"][self.analytes].to_numpy()
            Ni_u = self.data.loc[sample, self.analytes].to_numpy()

            concentrations = pd.DataFrame(
                Cn_u[:, np.newaxis] * (Cin_std / Ni_std) * Ni_u, columns=self.analytes
            )

            drift_concentrations_list = []

            for j, analyte, slope, intercept, drift in zip(
                range(len(self.analytes)),
                self.analytes,
                self.calibration_std_stats["slope"],
                self.calibration_std_stats["intercept"],
                self.calibration_std_stats["drift_correct"],
            ):
                if "True" in drift:
                    if "timestamp" in self.data.columns.tolist():
                        frac = (
                            slope
                            * np.array(
                                [
                                    np.datetime64(d, "m")
                                    for d in self.data.loc[sample, "timestamp"]
                                ]
                            ).astype(np.float64)
                            + intercept
                        )
                    else:
                        frac = slope * self.data.loc[sample, "index"] + intercept
                    frac = np.array(frac)
                    drift_concentrations = (
                        Cn_u[:, np.newaxis]
                        * (Cin_std[j] / frac)[:, np.newaxis]
                        * Ni_u[:, j][:, np.newaxis]
                    )

                    if isinstance(drift_concentrations, np.float64):
                        df = pd.DataFrame(
                            np.array([drift_concentrations]), columns=[analyte]
                        )

                    else:
                        df = pd.DataFrame(drift_concentrations, columns=[analyte])

                    drift_concentrations_list.append(df)

            if len(drift_concentrations_list) > 0:
                drift_df = pd.concat(drift_concentrations_list, axis="columns")

                if drift_df.shape[0] == 1:
                    drift_df["sample"] = sample
                    drift_df.set_index("sample", inplace=True)

            for column in drift_df.columns.tolist():
                if isinstance(concentrations, pd.Series):
                    concentrations.loc[column] = drift_df[column].to_numpy()[0]

                else:
                    concentrations[column] = drift_df[column].to_numpy()

            if isinstance(concentrations, pd.Series):
                concentrations = pd.DataFrame(concentrations).T
                concentrations["sample"] = sample
                concentrations.set_index("sample", inplace=True)

            unknown_concentrations_list.append(concentrations)

        self.SRM_concentrations = pd.concat(secondary_standards_concentrations_list)
        self.unknown_concentrations = pd.concat(unknown_concentrations_list)

        self.calculate_uncertainties()

        # ADD IN SPOT METADATA NOW

        self.unknown_concentrations[self.unknown_concentrations < 0] = "b.d.l."
        self.SRM_concentrations[self.SRM_concentrations < 0] = "b.d.l."

        self.SRM_concentrations.insert(
            0, "Spot", list(self.data.loc[self.secondary_standards, "Spot"])
        )

        if "timestamp" in self.data.columns.tolist():
            self.SRM_concentrations.insert(
                0,
                "timestamp",
                list(self.data.loc[self.secondary_standards, "timestamp"]),
            )
        else:
            self.SRM_concentrations.insert(
                0, "index", list(self.data.loc[self.secondary_standards, "index"])
            )
        self.unknown_concentrations.insert(
            0, "Spot", list(self.data.loc[self.samples_nostandards, "Spot"])
        )
        if "timestamp" in self.data.columns.tolist():
            self.unknown_concentrations.insert(
                0,
                "timestamp",
                list(self.data.loc[self.samples_nostandards, "timestamp"]),
            )
        else:
            self.unknown_concentrations.insert(
                0, "index", list(self.data.loc[self.samples_nostandards, "index"])
            )

        self.unknown_concentrations.index = [
            "unknown"
        ] * self.unknown_concentrations.shape[0]
        self.unknown_concentrations.index.name = "sample"

    def calculate_uncertainties(self):
        """
        Calculate the uncertainties for each analysis.

        """

        myuncertainties = [analyte + "_se" for analyte in self.analytes]
        srm_rel_ext_uncertainties_list = []
        unk_rel_ext_uncertainties_list = []
        srm_rel_int_uncertainties_list = []
        unk_rel_int_uncertainties_list = []
        # use RMSE of regression for elements where drift correction is applied rather than the standard error
        # of the mean of all the calibration standard normalized ratios
        rse_i_std = []
        for analyte in self.analytes:
            if "True" in self.calibration_std_stats.loc[analyte, "drift_correct"]:
                rse_i_std.append(
                    100
                    * self.calibration_std_stats.loc[analyte, "rmse"]
                    / self.calibration_std_stats.loc[analyte, "mean"]
                )
            else:
                rse_i_std.append(
                    self.calibration_std_stats.loc[analyte, "percent_std_err"]
                )

        rse_i_std = np.array(rse_i_std)

        for sample in self.secondary_standards:
            t1 = (
                self.standards_data.loc[sample, f"{self.int_std_element}_std"]
                / self.standards_data.loc[sample, f"{self.int_std_element}"]
            ) ** 2

            # concentration of internal standard in calibration standard uncertainties
            t2 = (
                self.standards_data.loc[
                    self.calibration_std, f"{self.int_std_element}_std"
                ]
                / self.standards_data.loc[
                    self.calibration_std, f"{self.int_std_element}"
                ]
            ) ** 2

            # concentration of each analyte in calibration standard uncertainties
            std_conc_stds = []
            for element in self.elements:
                # if our element is in the list of standard elements take the ratio
                if element in self.standard_elements:
                    std_conc_stds.append(
                        (
                            self.standards_data.loc[
                                self.calibration_std, f"{element}_std"
                            ]
                            / self.standards_data.loc[self.calibration_std, element]
                        )
                        ** 2
                    )

            std_conc_stds = np.array(std_conc_stds)

            # Overall uncertainties
            # Need to loop through each row?

            rel_ext_uncertainty = pd.DataFrame(
                np.sqrt(
                    np.array(
                        t1
                        + t2
                        + std_conc_stds
                        + (rse_i_std[np.newaxis, :] / 100) ** 2
                        + (self.data.loc[sample, myuncertainties].to_numpy() / 100) ** 2
                    ).astype(np.float64)
                )
            )
            rel_int_uncertainty = pd.DataFrame(
                np.sqrt(
                    np.array(
                        t1
                        # +t2
                        # + std_conc_stds
                        + (rse_i_std[np.newaxis, :] / 100) ** 2
                        + (self.data.loc[sample, myuncertainties].to_numpy() / 100) ** 2
                    ).astype(np.float64)
                )
            )
            rel_ext_uncertainty.columns = [f"{a}_exterr" for a in self.analytes]
            srm_rel_ext_uncertainties_list.append(rel_ext_uncertainty)
            rel_int_uncertainty.columns = [f"{a}_interr" for a in self.analytes]
            srm_rel_int_uncertainties_list.append(rel_int_uncertainty)

        srm_rel_ext_uncertainties = pd.concat(srm_rel_ext_uncertainties_list)
        srm_rel_int_uncertainties = pd.concat(srm_rel_int_uncertainties_list)

        srm_ext_uncertainties = pd.DataFrame(
            srm_rel_ext_uncertainties.values
            * self.SRM_concentrations.loc[:, self.analytes].values,
            columns=[f"{a}_exterr" for a in self.analytes],
            index=self.SRM_concentrations.index,
        )
        srm_int_uncertainties = pd.DataFrame(
            srm_rel_int_uncertainties.values
            * self.SRM_concentrations.loc[:, self.analytes].values,
            columns=[f"{a}_interr" for a in self.analytes],
            index=self.SRM_concentrations.index,
        )

        self.SRM_concentrations = pd.concat(
            [self.SRM_concentrations, srm_ext_uncertainties, srm_int_uncertainties],
            axis="columns",
        )

        ######################################

        for sample in self.samples_nostandards:
            # concentration of internal standard in unknown uncertainties
            int_std_element = re.split(
                r"(\d+)", self.calibration_std_data["norm"].unique()[0]
            )[2]
            # concentration of internal standard in unknown uncertainties
            t1 = (self.data.loc[sample, "int_std_rel_unc"] / 100) ** 2
            t1 = np.array(t1)
            t1 = t1[:, np.newaxis]

            # concentration of internal standard in calibration standard uncertainties
            t2 = (
                self.standards_data.loc[self.calibration_std, f"{int_std_element}_std"]
                / self.standards_data.loc[self.calibration_std, f"{int_std_element}"]
            ) ** 2

            # concentration of each analyte in calibration standard uncertainties
            std_conc_stds = []
            for element in self.elements:
                # # if our element is in the list of standard elements take the ratio
                if element in self.standard_elements:
                    std_conc_stds.append(
                        (
                            self.standards_data.loc[
                                self.calibration_std, f"{element}_std"
                            ]
                            / self.standards_data.loc[self.calibration_std, element]
                        )
                        ** 2
                    )

            std_conc_stds = np.array(std_conc_stds)

            # Overall uncertainties
            # Need to loop through each row?

            rel_ext_uncertainty = pd.DataFrame(
                np.sqrt(
                    np.array(
                        t1
                        + t2
                        + std_conc_stds
                        + (rse_i_std[np.newaxis, :] / 100) ** 2
                        + (self.data.loc[sample, myuncertainties].to_numpy() / 100) ** 2
                    ).astype(np.float64)
                )
            )
            rel_int_uncertainty = pd.DataFrame(
                np.sqrt(
                    np.array(
                        t1
                        # +t2
                        # + std_conc_stds
                        + (rse_i_std[np.newaxis, :] / 100) ** 2
                        + (self.data.loc[sample, myuncertainties].to_numpy() / 100) ** 2
                    ).astype(np.float64)
                )
            )
            rel_ext_uncertainty.columns = [f"{a}_exterr" for a in self.analytes]
            unk_rel_ext_uncertainties_list.append(rel_ext_uncertainty)
            rel_int_uncertainty.columns = [f"{a}_interr" for a in self.analytes]
            unk_rel_int_uncertainties_list.append(rel_int_uncertainty)

        unk_rel_ext_uncertainties = pd.concat(unk_rel_ext_uncertainties_list)
        unk_rel_int_uncertainties = pd.concat(unk_rel_int_uncertainties_list)

        unknown_ext_uncertainties = pd.DataFrame(
            unk_rel_ext_uncertainties.values
            * self.unknown_concentrations.loc[:, self.analytes].values,
            columns=[f"{a}_exterr" for a in self.analytes],
            index=self.unknown_concentrations.index,
        )

        unknown_int_uncertainties = pd.DataFrame(
            unk_rel_int_uncertainties.values
            * self.unknown_concentrations.loc[:, self.analytes].values,
            columns=[f"{a}_interr" for a in self.analytes],
            index=self.unknown_concentrations.index,
        )

        self.unknown_concentrations = pd.concat(
            [
                self.unknown_concentrations,
                unknown_ext_uncertainties,
                unknown_int_uncertainties,
            ],
            axis="columns",
        )

    # make an accuracy checking function
    # need to use analytes no mass to check SRM vals
    def get_secondary_standard_accuracies(self):
        """
        calculate the accuracy of each secondary standard where accuracy is 100 * measured / accepted value

        Here `accepted` value is the GEOREM preferred value for that SRM analyte pair.

        """
        df_list = []

        for standard in self.secondary_standards:
            df = pd.DataFrame(
                100
                * self.SRM_concentrations.loc[standard, self.analytes]
                .replace("b.d.l.", np.nan)
                .values
                / self.standards_data.loc[standard, self.elements].values[
                    np.newaxis, :
                ],
                columns=self.analytes,
                index=self.SRM_concentrations.loc[standard, :].index,
            ).fillna("b.d.l.")
            df.insert(0, "Spot", self.SRM_concentrations.loc[standard, "Spot"])
            if "timestamp" in self.data.columns:
                df.insert(
                    0, "timestamp", self.SRM_concentrations.loc[standard, "timestamp"]
                )
            else:
                df.insert(0, "index", self.SRM_concentrations.loc[standard, "index"])

            df_list.append(df)

        self.SRM_accuracies = pd.concat(df_list)

__init__(name)

Parameters:

Name Type Description Default
name str

The name of the experiment to be processed

required
Source code in lasertram\calc\calc.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def __init__(self, name):
    """


    Args:
        name (str): The name of the experiment to be processed
    """
    # all attributes in relative chronological order that they are created in
    # if everything is done correctly. These all will get rewritten throughout the
    # data processing pipeline but this allows us to see what all the potential attributes
    # are going to be from the beginning (PEP convention)

    # for the math involved please see:

    # name for the lasercalc object
    # for notekeeping
    self.name = name

    # 2D pandas dataframe of standards reference material preferred compositions
    # from georem
    self.standards_data = None

    # List of standard reference materials in self.standards_data
    self.database_standards = None

    # list of standard reference material elements/oxides in self.standards_data
    self.standard_elements = None

    # list of standard reference material element/oxide 1 sigma uncertainties in self.standards_data
    self.standard_element_uncertainties = None

    # list of spot analyses for which concentrations are being calculated
    # this is the equivalent of self.data['Spot']
    self.spots = None

    # list of analytes for which concentrations are being calculated
    # these are column headers in self.data
    self.analytes = None

    # 1 sigma standard deviation of the calibration standard values
    # in self.data. Is len(analytes) in shape
    self.calibration_std_stdevs = None

    # 2D pandas dataframe that represents the metadata and data for numerous
    # spot analyses. Each row is the equivalent of a LaserTRAM.output_report
    # and has the following columns:
    # |timestamp|Spot|despiked|omitted_region|bkgd_start|bkgd_stop|int_start|int_stop|norm|norm_cps|analyte vals and uncertainties -->|
    # |---------|----|--------|--------------|----------|---------|---------|--------|----|--------|----------------------------------|
    self.data = None

    # element used as internal standard. NOT to be confused with analyte
    # e.g. self.int_std_element == 'Si' NOT '29Si'
    self.int_std_element = None

    # list of standard reference materials in found in self.data that are
    # also found in self.database_standards. This lets you know which standard reference
    # materials you can use as potential calibration standards
    self.potential_calibration_standards = None

    # list of samples in self.data with the self.potential_calibration_standards
    # removed
    self.samples_nostandards = None

    # list of elements for which concentrations are being calculated
    # this is the equivalent to self.analytes with the atomic masses
    # removed
    self.elements = None

    # string representing the standard reference material used
    # as the calibration standard for calculating concentrations
    self.calibration_std = None

    # 2D pandas dataframe which is a subset of self.data for only the
    # calibration standard data. This is essentially self.data.loc[self.calibration_std,:]
    self.calibration_std_data = None

    # mean calibration standard values for all analytes
    # equivalent of self.calibration_std_data.mean(axis = 0)
    self.calibration_std_means = None

    # calibration standard standard error of the mean for all analytes
    self.calibration_std_ses = None

    # 2D dataframe that is contains statistics for each analyte in self.calibration_std_data
    # columns are:
    # drift_correct | f_pval | f_value | f_crit_value | rmse | slope | intercept | mean | std_dev | percent_std_err
    # These stats are based on the following regression:
    # for each analyte
    # x = self.calibration_std_data.loc[:,'timestamp']
    # y = self.calibration_std_data.loc[:, analyte]

    # X = sm.add_constant(x)
    # Note the difference in argument order
    # model = sm.OLS(y, X).fit()
    # now generate predictions
    # ypred = model.predict(X)

    # calc rmse
    # RMSE = rmse(y, ypred)

    self.calibration_std_stats = None

    # the ratio of concentrations between an analyte and the internal standard
    # in the georem calibration standard values
    self.calibration_std_conc_ratios = None

    # list of standard reference materials that are not used as calibration standard
    # this is effectively self.potential_calibration_standards with self.calibration_std
    # removed
    self.secondary_standards = None

    # 2D pandas dataframe of calculated concentrations for all spots in self.secondary_standards and all
    # analytes in self.analytes. This is self.data.loc[self.secondary_standards,self.analytes].shape in shape
    self.SRM_concentrations = None

    # 2D pandas dataframe of calculated concentrations for all spots in self.spots and all
    # analytes in self.analytes. This is self.data.loc[self.spots,self.analytes].shape in shape
    self.unknown_concentrations = None

    # 2D pandas dataframe of calculated accuracies for all spots in self.secondary_standards and all
    # analytes in self.analytes. This is self.data.loc[self.secondary_standards,self.analytes].shape in shape
    # here accuracy is just 100*measured_concentration / georem_concentration
    self.SRM_accuracies = None

calculate_concentrations()

Calculates the concentration and uncertainty of all spots in the experiment using the user specified calibration standard and internal standard concentrations/uncertainties.

Source code in lasertram\calc\calc.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
def calculate_concentrations(self):
    """
    Calculates the concentration and uncertainty of all spots in the experiment
    using the user specified calibration standard and internal standard
    concentrations/uncertainties.

    """

    secondary_standards = self.potential_calibration_standards.copy()
    secondary_standards.remove(self.calibration_std)
    self.secondary_standards = secondary_standards
    secondary_standards_concentrations_list = []
    unknown_concentrations_list = []

    for sample in secondary_standards:
        Cn_u = self.standards_data.loc[
            sample,
            re.split(
                r"(\d+)",
                self.calibration_std_data["norm"].unique()[0],
            )[2],
        ]
        Cin_std = self.calibration_std_conc_ratios
        Ni_std = self.calibration_std_stats["mean"][self.analytes]
        Ni_u = self.data.loc[sample, self.analytes]

        concentrations = Cn_u * (Cin_std / Ni_std) * Ni_u

        drift_concentrations_list = []

        for j, analyte, slope, intercept, drift in zip(
            range(len(self.analytes)),
            self.analytes,
            self.calibration_std_stats["slope"],
            self.calibration_std_stats["intercept"],
            self.calibration_std_stats["drift_correct"],
        ):
            if "True" in drift:
                if "timestamp" in self.data.columns.tolist():
                    frac = (
                        slope
                        * np.array(
                            [
                                np.datetime64(d, "m")
                                for d in self.data.loc[sample, "timestamp"]
                            ]
                        ).astype(np.float64)
                        + intercept
                    )
                else:
                    frac = slope * self.data.loc[sample, "index"] + intercept

                Ni_std = frac

                drift_concentrations = Cn_u * (Cin_std[j] / Ni_std) * Ni_u[analyte]

                if isinstance(drift_concentrations, np.float64):
                    df = pd.DataFrame(
                        np.array([drift_concentrations]), columns=[analyte]
                    )

                else:
                    df = pd.DataFrame(drift_concentrations, columns=[analyte])

                drift_concentrations_list.append(df)

        if len(drift_concentrations_list) > 0:
            drift_df = pd.concat(drift_concentrations_list, axis="columns")

            if drift_df.shape[0] == 1:
                drift_df["sample"] = sample
                drift_df.set_index("sample", inplace=True)
        else:
            drift_df = pd.DataFrame()

        for column in drift_df.columns.tolist():
            if isinstance(concentrations, pd.Series):
                concentrations.loc[column] = drift_df[column].to_numpy()[0]

            else:
                concentrations[column] = drift_df[column].to_numpy()

        if isinstance(concentrations, pd.Series):
            concentrations = pd.DataFrame(concentrations).T
            concentrations["sample"] = sample
            concentrations.set_index("sample", inplace=True)

        secondary_standards_concentrations_list.append(concentrations)

    ###############################
    for sample in self.samples_nostandards:
        Cn_u = conversions.oxide_to_ppm(
            self.data.loc[sample, "int_std_comp"],
            self.data.loc[sample, "norm"].unique()[0],
        ).to_numpy()
        Cin_std = self.calibration_std_conc_ratios
        Ni_std = self.calibration_std_stats["mean"][self.analytes].to_numpy()
        Ni_u = self.data.loc[sample, self.analytes].to_numpy()

        concentrations = pd.DataFrame(
            Cn_u[:, np.newaxis] * (Cin_std / Ni_std) * Ni_u, columns=self.analytes
        )

        drift_concentrations_list = []

        for j, analyte, slope, intercept, drift in zip(
            range(len(self.analytes)),
            self.analytes,
            self.calibration_std_stats["slope"],
            self.calibration_std_stats["intercept"],
            self.calibration_std_stats["drift_correct"],
        ):
            if "True" in drift:
                if "timestamp" in self.data.columns.tolist():
                    frac = (
                        slope
                        * np.array(
                            [
                                np.datetime64(d, "m")
                                for d in self.data.loc[sample, "timestamp"]
                            ]
                        ).astype(np.float64)
                        + intercept
                    )
                else:
                    frac = slope * self.data.loc[sample, "index"] + intercept
                frac = np.array(frac)
                drift_concentrations = (
                    Cn_u[:, np.newaxis]
                    * (Cin_std[j] / frac)[:, np.newaxis]
                    * Ni_u[:, j][:, np.newaxis]
                )

                if isinstance(drift_concentrations, np.float64):
                    df = pd.DataFrame(
                        np.array([drift_concentrations]), columns=[analyte]
                    )

                else:
                    df = pd.DataFrame(drift_concentrations, columns=[analyte])

                drift_concentrations_list.append(df)

        if len(drift_concentrations_list) > 0:
            drift_df = pd.concat(drift_concentrations_list, axis="columns")

            if drift_df.shape[0] == 1:
                drift_df["sample"] = sample
                drift_df.set_index("sample", inplace=True)

        for column in drift_df.columns.tolist():
            if isinstance(concentrations, pd.Series):
                concentrations.loc[column] = drift_df[column].to_numpy()[0]

            else:
                concentrations[column] = drift_df[column].to_numpy()

        if isinstance(concentrations, pd.Series):
            concentrations = pd.DataFrame(concentrations).T
            concentrations["sample"] = sample
            concentrations.set_index("sample", inplace=True)

        unknown_concentrations_list.append(concentrations)

    self.SRM_concentrations = pd.concat(secondary_standards_concentrations_list)
    self.unknown_concentrations = pd.concat(unknown_concentrations_list)

    self.calculate_uncertainties()

    # ADD IN SPOT METADATA NOW

    self.unknown_concentrations[self.unknown_concentrations < 0] = "b.d.l."
    self.SRM_concentrations[self.SRM_concentrations < 0] = "b.d.l."

    self.SRM_concentrations.insert(
        0, "Spot", list(self.data.loc[self.secondary_standards, "Spot"])
    )

    if "timestamp" in self.data.columns.tolist():
        self.SRM_concentrations.insert(
            0,
            "timestamp",
            list(self.data.loc[self.secondary_standards, "timestamp"]),
        )
    else:
        self.SRM_concentrations.insert(
            0, "index", list(self.data.loc[self.secondary_standards, "index"])
        )
    self.unknown_concentrations.insert(
        0, "Spot", list(self.data.loc[self.samples_nostandards, "Spot"])
    )
    if "timestamp" in self.data.columns.tolist():
        self.unknown_concentrations.insert(
            0,
            "timestamp",
            list(self.data.loc[self.samples_nostandards, "timestamp"]),
        )
    else:
        self.unknown_concentrations.insert(
            0, "index", list(self.data.loc[self.samples_nostandards, "index"])
        )

    self.unknown_concentrations.index = [
        "unknown"
    ] * self.unknown_concentrations.shape[0]
    self.unknown_concentrations.index.name = "sample"

calculate_uncertainties()

Calculate the uncertainties for each analysis.

Source code in lasertram\calc\calc.py
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
def calculate_uncertainties(self):
    """
    Calculate the uncertainties for each analysis.

    """

    myuncertainties = [analyte + "_se" for analyte in self.analytes]
    srm_rel_ext_uncertainties_list = []
    unk_rel_ext_uncertainties_list = []
    srm_rel_int_uncertainties_list = []
    unk_rel_int_uncertainties_list = []
    # use RMSE of regression for elements where drift correction is applied rather than the standard error
    # of the mean of all the calibration standard normalized ratios
    rse_i_std = []
    for analyte in self.analytes:
        if "True" in self.calibration_std_stats.loc[analyte, "drift_correct"]:
            rse_i_std.append(
                100
                * self.calibration_std_stats.loc[analyte, "rmse"]
                / self.calibration_std_stats.loc[analyte, "mean"]
            )
        else:
            rse_i_std.append(
                self.calibration_std_stats.loc[analyte, "percent_std_err"]
            )

    rse_i_std = np.array(rse_i_std)

    for sample in self.secondary_standards:
        t1 = (
            self.standards_data.loc[sample, f"{self.int_std_element}_std"]
            / self.standards_data.loc[sample, f"{self.int_std_element}"]
        ) ** 2

        # concentration of internal standard in calibration standard uncertainties
        t2 = (
            self.standards_data.loc[
                self.calibration_std, f"{self.int_std_element}_std"
            ]
            / self.standards_data.loc[
                self.calibration_std, f"{self.int_std_element}"
            ]
        ) ** 2

        # concentration of each analyte in calibration standard uncertainties
        std_conc_stds = []
        for element in self.elements:
            # if our element is in the list of standard elements take the ratio
            if element in self.standard_elements:
                std_conc_stds.append(
                    (
                        self.standards_data.loc[
                            self.calibration_std, f"{element}_std"
                        ]
                        / self.standards_data.loc[self.calibration_std, element]
                    )
                    ** 2
                )

        std_conc_stds = np.array(std_conc_stds)

        # Overall uncertainties
        # Need to loop through each row?

        rel_ext_uncertainty = pd.DataFrame(
            np.sqrt(
                np.array(
                    t1
                    + t2
                    + std_conc_stds
                    + (rse_i_std[np.newaxis, :] / 100) ** 2
                    + (self.data.loc[sample, myuncertainties].to_numpy() / 100) ** 2
                ).astype(np.float64)
            )
        )
        rel_int_uncertainty = pd.DataFrame(
            np.sqrt(
                np.array(
                    t1
                    # +t2
                    # + std_conc_stds
                    + (rse_i_std[np.newaxis, :] / 100) ** 2
                    + (self.data.loc[sample, myuncertainties].to_numpy() / 100) ** 2
                ).astype(np.float64)
            )
        )
        rel_ext_uncertainty.columns = [f"{a}_exterr" for a in self.analytes]
        srm_rel_ext_uncertainties_list.append(rel_ext_uncertainty)
        rel_int_uncertainty.columns = [f"{a}_interr" for a in self.analytes]
        srm_rel_int_uncertainties_list.append(rel_int_uncertainty)

    srm_rel_ext_uncertainties = pd.concat(srm_rel_ext_uncertainties_list)
    srm_rel_int_uncertainties = pd.concat(srm_rel_int_uncertainties_list)

    srm_ext_uncertainties = pd.DataFrame(
        srm_rel_ext_uncertainties.values
        * self.SRM_concentrations.loc[:, self.analytes].values,
        columns=[f"{a}_exterr" for a in self.analytes],
        index=self.SRM_concentrations.index,
    )
    srm_int_uncertainties = pd.DataFrame(
        srm_rel_int_uncertainties.values
        * self.SRM_concentrations.loc[:, self.analytes].values,
        columns=[f"{a}_interr" for a in self.analytes],
        index=self.SRM_concentrations.index,
    )

    self.SRM_concentrations = pd.concat(
        [self.SRM_concentrations, srm_ext_uncertainties, srm_int_uncertainties],
        axis="columns",
    )

    ######################################

    for sample in self.samples_nostandards:
        # concentration of internal standard in unknown uncertainties
        int_std_element = re.split(
            r"(\d+)", self.calibration_std_data["norm"].unique()[0]
        )[2]
        # concentration of internal standard in unknown uncertainties
        t1 = (self.data.loc[sample, "int_std_rel_unc"] / 100) ** 2
        t1 = np.array(t1)
        t1 = t1[:, np.newaxis]

        # concentration of internal standard in calibration standard uncertainties
        t2 = (
            self.standards_data.loc[self.calibration_std, f"{int_std_element}_std"]
            / self.standards_data.loc[self.calibration_std, f"{int_std_element}"]
        ) ** 2

        # concentration of each analyte in calibration standard uncertainties
        std_conc_stds = []
        for element in self.elements:
            # # if our element is in the list of standard elements take the ratio
            if element in self.standard_elements:
                std_conc_stds.append(
                    (
                        self.standards_data.loc[
                            self.calibration_std, f"{element}_std"
                        ]
                        / self.standards_data.loc[self.calibration_std, element]
                    )
                    ** 2
                )

        std_conc_stds = np.array(std_conc_stds)

        # Overall uncertainties
        # Need to loop through each row?

        rel_ext_uncertainty = pd.DataFrame(
            np.sqrt(
                np.array(
                    t1
                    + t2
                    + std_conc_stds
                    + (rse_i_std[np.newaxis, :] / 100) ** 2
                    + (self.data.loc[sample, myuncertainties].to_numpy() / 100) ** 2
                ).astype(np.float64)
            )
        )
        rel_int_uncertainty = pd.DataFrame(
            np.sqrt(
                np.array(
                    t1
                    # +t2
                    # + std_conc_stds
                    + (rse_i_std[np.newaxis, :] / 100) ** 2
                    + (self.data.loc[sample, myuncertainties].to_numpy() / 100) ** 2
                ).astype(np.float64)
            )
        )
        rel_ext_uncertainty.columns = [f"{a}_exterr" for a in self.analytes]
        unk_rel_ext_uncertainties_list.append(rel_ext_uncertainty)
        rel_int_uncertainty.columns = [f"{a}_interr" for a in self.analytes]
        unk_rel_int_uncertainties_list.append(rel_int_uncertainty)

    unk_rel_ext_uncertainties = pd.concat(unk_rel_ext_uncertainties_list)
    unk_rel_int_uncertainties = pd.concat(unk_rel_int_uncertainties_list)

    unknown_ext_uncertainties = pd.DataFrame(
        unk_rel_ext_uncertainties.values
        * self.unknown_concentrations.loc[:, self.analytes].values,
        columns=[f"{a}_exterr" for a in self.analytes],
        index=self.unknown_concentrations.index,
    )

    unknown_int_uncertainties = pd.DataFrame(
        unk_rel_int_uncertainties.values
        * self.unknown_concentrations.loc[:, self.analytes].values,
        columns=[f"{a}_interr" for a in self.analytes],
        index=self.unknown_concentrations.index,
    )

    self.unknown_concentrations = pd.concat(
        [
            self.unknown_concentrations,
            unknown_ext_uncertainties,
            unknown_int_uncertainties,
        ],
        axis="columns",
    )

drift_check(pval=0.01)

For each analyte in the experiment, perform a linear regression to assess whether or not drift in the mass spectrometer is happening at a significant level. Significance is determined by setting the pval threshold. If the regression is statistically significant, it gets flagged for later correct treatment in calculate_concentrations

Parameters

pval : float, optional significance threshold to reject the null hypothesis for drift correction, by default 0.01

Source code in lasertram\calc\calc.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def drift_check(self, pval=0.01):
    """For each analyte in the experiment, perform a linear regression to
    assess whether or not drift in the mass spectrometer is happening at a
    significant level. Significance is determined by setting the `pval` threshold.
    If the regression is statistically significant, it gets flagged for later
    correct treatment in `calculate_concentrations`



    Parameters
    ----------
    pval : float, optional
        significance threshold to reject the null hypothesis for drift correction, by default 0.01
    """
    calib_std_rmses = []
    calib_std_slopes = []
    calib_std_intercepts = []
    drift_check = []

    f_pvals = []
    f_vals = []
    f_crits = []
    for analyte in self.analytes:
        # Getting regression statistics on analyte normalized ratios through time
        # for the calibration standard. This is what we use to check to see if it needs
        # to be drift corrected
        if "timestamp" in self.calibration_std_data.columns.tolist():
            # get an array in time units based on timestamp column. This is
            # is in seconds
            x = np.array(
                [
                    np.datetime64(d, "m")
                    for d in self.calibration_std_data["timestamp"]
                ]
            ).astype(np.float64)
            # x = np.cumsum(np.diff(x))
            # x = np.insert(x, 0, 0).astype(np.float64)

        else:
            x = self.calibration_std_data["index"].to_numpy()

        y = self.calibration_std_data.loc[:, analyte].astype("float64")

        X = sm.add_constant(x)
        # Note the difference in argument order
        model = sm.OLS(y, X).fit()
        # now generate predictions
        ypred = model.predict(X)

        # calc rmse
        RMSE = rmse(y, ypred)

        calib_std_rmses.append(RMSE)

        if model.params.shape[0] < 2:
            calib_std_slopes.append(model.params.loc["x1"])
            calib_std_intercepts.append(0)

        else:
            calib_std_slopes.append(model.params.loc["x1"])
            calib_std_intercepts.append(model.params.loc["const"])

        # new stuff
        # confidence limit 99%

        # f value stuff

        fvalue = model.fvalue
        f_vals.append(fvalue)
        f_pvalue = model.f_pvalue
        f_pvals.append(f_pvalue)
        fcrit = stats.f.ppf(q=1 - pval, dfn=len(x) - 1, dfd=len(y) - 1)
        f_crits.append(fcrit)
        if (f_pvalue < pval) and (fvalue > fcrit):
            drift = "True"
            drift_check.append(drift)
        else:
            drift = "False"
            drift_check.append(drift)

    self.calibration_std_stats = pd.DataFrame(
        {
            "drift_correct": drift_check,
            "f_pval": f_pvals,
            "f_value": f_vals,
            "f_crit_value": f_crits,
            "rmse": calib_std_rmses,
            "slope": calib_std_slopes,
            "intercept": calib_std_intercepts,
            "mean": self.calibration_std_means[self.analytes].to_numpy(),
            "std_dev": self.calibration_std_stdevs[self.analytes].to_numpy(),
            "percent_std_err": self.calibration_std_ses[self.analytes].to_numpy(),
        },
        index=self.analytes,
    )

get_SRM_comps(df)

load in a database of standard reference material compositions

Parameters:

Name Type Description Default
df pandas DataFrame

pandas DataFrame of standard reference materials

required

where each row represents data for a standard reference material. The first column should be named "Standard". All other columns are for different elemental concentrations.Standard names must be exact names found in GEOREM: http://georem.mpch-mainz.gwdg.de/sample_query_pref.asp

Source code in lasertram\calc\calc.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def get_SRM_comps(self, df):
    """load in a database of standard reference material compositions

    Args:
        df (pandas DataFrame): pandas DataFrame of standard reference materials
    where each row represents data for a standard reference material.
    The first column should be named "Standard". All other columns are
    for different elemental concentrations.Standard names must be exact
    names found in GEOREM: http://georem.mpch-mainz.gwdg.de/sample_query_pref.asp
    """

    self.standards_data = df.set_index("Standard")
    self.database_standards = self.standards_data.index.unique().to_list()
    # Get a list of all of the elements supported in the published standard datasheet
    # Get a second list for the same elements but their corresponding uncertainty columns
    self.standard_elements = [
        analyte
        for analyte in self.standards_data.columns.tolist()
        if "_std" not in analyte
    ]
    self.standard_element_uncertainties = [
        analyte + "_std" for analyte in self.standard_elements
    ]

get_calibration_std_ratios()

For the calibration standard, calculate the concentration ratio between every analyte and the internal standard.

Source code in lasertram\calc\calc.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def get_calibration_std_ratios(self):
    """
    For the calibration standard, calculate the concentration ratio between every analyte and the internal standard.
    """

    # For our calibration standard, calculate the concentration ratio
    # of each analyte to the element used as the internal standard
    std_conc_ratios = []

    for element in self.elements:
        if element in self.standard_elements:
            std_conc_ratios.append(
                self.standards_data.loc[self.calibration_std, element]
                / self.standards_data.loc[
                    self.calibration_std, self.int_std_element
                ]
            )

    # make our list an array for easier math going forward
    self.calibration_std_conc_ratios = np.array(std_conc_ratios)

get_data(df)

load in output from LaserTRAM for calculation of concentrations

Parameters:

Name Type Description Default
df pandas DataFrame

a 2D pandas DataFrame representing numerous concatenated calls to LaserTRAM.make_output_report()

required
Source code in lasertram\calc\calc.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def get_data(self, df):
    """load in output from `LaserTRAM` for calculation of concentrations

    Args:
        df (pandas DataFrame): a 2D pandas DataFrame representing numerous concatenated calls to `LaserTRAM.make_output_report()`

    """
    # check if first row is nan (output from GUI does this).
    # If so, remove it
    df = df[df.iloc[:, 0].isna() == False]

    data = df.set_index("Spot")
    data.insert(loc=1, column="index", value=np.arange(1, len(data) + 1))

    self.spots = data.index.unique().dropna().tolist()

    # Check for potential calibration standards. This will let us know what our options
    # are for choosing calibration standards by looking for spots that have the same string
    # as the standard spreadsheet

    stds_column = [
        [std for std in self.database_standards if std in spot]
        for spot in self.spots
    ]

    stds_column = [["unknown"] if not l else l for l in stds_column]

    stds_column = [std for sublist in stds_column for std in sublist]

    # standards that can be used as calibrations standards (must have more than 1 analysis)
    # potential_standards = list(np.unique(stds_column))
    potential_standards = [
        std for std in np.unique(stds_column) if stds_column.count(std) > 1
    ]
    potential_standards.remove("unknown")

    # all of the samples in your input sheet that are NOT potential standards
    all_standards = list(np.unique(stds_column))
    all_standards.remove("unknown")

    data["sample"] = stds_column

    data.reset_index(inplace=True)
    data.set_index("sample", inplace=True)

    self.data = data
    self.potential_calibration_standards = potential_standards
    self.samples_nostandards = list(np.setdiff1d(stds_column, all_standards))

    self.analytes = [
        analyte
        for analyte in data.columns.tolist()
        if not (
            "_se" in analyte
            or "norm" in analyte
            or "index" in analyte
            or "Spot" in analyte
            or "wt%" in analyte
            or "1stdev%" in analyte
            or "start" in analyte
            or "stop" in analyte
            or "long" in analyte
            or "timestamp" in analyte
            or "despiked" in analyte
            or "omitted_region" in analyte
        )
    ]
    # elements without isotopes in the front
    self.elements = [re.split(r"(\d+)", analyte)[2] for analyte in self.analytes]

    # internal standard analyte from lasertram
    self.int_std_element = re.split(r"(\d+)", self.data["norm"].unique()[0])[2]

get_secondary_standard_accuracies()

calculate the accuracy of each secondary standard where accuracy is 100 * measured / accepted value

Here accepted value is the GEOREM preferred value for that SRM analyte pair.

Source code in lasertram\calc\calc.py
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
def get_secondary_standard_accuracies(self):
    """
    calculate the accuracy of each secondary standard where accuracy is 100 * measured / accepted value

    Here `accepted` value is the GEOREM preferred value for that SRM analyte pair.

    """
    df_list = []

    for standard in self.secondary_standards:
        df = pd.DataFrame(
            100
            * self.SRM_concentrations.loc[standard, self.analytes]
            .replace("b.d.l.", np.nan)
            .values
            / self.standards_data.loc[standard, self.elements].values[
                np.newaxis, :
            ],
            columns=self.analytes,
            index=self.SRM_concentrations.loc[standard, :].index,
        ).fillna("b.d.l.")
        df.insert(0, "Spot", self.SRM_concentrations.loc[standard, "Spot"])
        if "timestamp" in self.data.columns:
            df.insert(
                0, "timestamp", self.SRM_concentrations.loc[standard, "timestamp"]
            )
        else:
            df.insert(0, "index", self.SRM_concentrations.loc[standard, "index"])

        df_list.append(df)

    self.SRM_accuracies = pd.concat(df_list)

set_calibration_standard(std)

Assign which standard reference material will be the calibration standard for calculating concentrations.

Parameters:

Name Type Description Default
std str

name of standard reference material (e.g., NIST-612,BCR-2G)

required
Source code in lasertram\calc\calc.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def set_calibration_standard(self, std):
    """Assign which standard reference material will be the calibration
    standard for calculating concentrations.

    Args:
        std (str): name of standard reference material (e.g., `NIST-612`,`BCR-2G`)
    """
    self.calibration_std = std

    self.calibration_std_data = self.data.loc[std, :]
    # Calibration standard information
    # mean
    self.calibration_std_means = self.calibration_std_data.loc[
        :, self.analytes
    ].mean()
    # std deviation
    self.calibration_std_stdevs = self.calibration_std_data.loc[
        :, self.analytes
    ].std()
    # relative standard error
    self.calibration_std_ses = 100 * (
        (self.calibration_std_stdevs / self.calibration_std_means)
        / np.sqrt(self.calibration_std_data.shape[0])
    )

set_int_std_concentrations(spots=None, concentrations=None, uncertainties=None)

Assign the concentration and uncertainty of the internal standard analyte to a series of spots.

Briefly...a linear change in the concentration value reflects a linear change in the calculated concentration.

Parameters:

Name Type Description Default
spots pandas Series

pandas series containing the names of the spots tohave their internal standard concentration-uncertainty assigned. This is the Spot column from the output of LaserTRAM.

None
concentrations array - like

values representing the internal standard concentration. Must be the same shape as spots.

None
uncertainties array - like

values representing the internal standard relative uncertainty in percent. Must be the same shape as spots.

None
Source code in lasertram\calc\calc.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def set_int_std_concentrations(
    self,
    spots=None,
    concentrations=None,
    uncertainties=None,
):
    """Assign the concentration and uncertainty of the internal standard analyte to
    a series of spots.

    Briefly...a linear change in the concentration value reflects a linear change
    in the calculated concentration.

    Args:
        spots (pandas Series): pandas series containing the names of the spots tohave their internal standard concentration-uncertainty assigned. This is the `Spot` column from the output of `LaserTRAM`.

        concentrations (array-like): values representing the internal standard concentration. Must be the same shape as `spots`.
        uncertainties (array-like): values representing the internal standard relative uncertainty in percent. Must be the same shape as `spots`.
    """
    if spots is None:
        spots = (self.data["Spot"],)
        concentrations = (np.full(self.data["Spot"].shape[0], 10),)
        uncertainties = (np.full(self.data["Spot"].shape[0], 1),)

    self.data["int_std_comp"] = 10.0
    self.data["int_std_rel_unc"] = 1.0
    df = self.data.reset_index().set_index("Spot")

    for spot, concentration, uncertainty in zip(
        spots, concentrations, uncertainties
    ):
        df.loc[spot, "int_std_comp"] = concentration
        df.loc[spot, "int_std_rel_unc"] = uncertainty

    self.data["int_std_comp"] = df["int_std_comp"].to_numpy()
    self.data["int_std_rel_unc"] = df["int_std_rel_unc"].to_numpy()

batch module: For batch processing operations in laserTRAM

process_spot(spot, raw_data, bkgd, keep, int_std, omit=None, despike=False, output_report=True)

a function to incorporate all the methods of the LaserTRAM class so a spot can be processed in an efficient and compact way.

Parameters:

Name Type Description Default
spot LaserTRAM spot object

an empty LaserTRAM spot object to be processed

required
raw_data pandas DataFrame

the raw counts per second dataframe to be assigned to the spot. Shape is (m x n) where m is the number of cycles through the mass range

required
bkgd tuple

(start, stop) pair of values corresponding to the analysis time where the background signal starts and stops

required
keep tuple

(start, stop) pair of values correpsonding to the analysis time where the interval signal for concentrations starts and stops

required
int_std str

column name for the internal standard analyte (e.g., 29Si)

required
omit tuple

(start, stop) pair of values corresponding to the analysis time to be omitted from the keep interval. Defaults to None.

None
despike bool

Whether or not to despike all analyte signals using the standard deviation filter from LaserTRAM.despike_data(). Defaults to False

False
output_report bool

Whether or not to create a 1-row pandas DataFrame output report in the following format. Defaults to True.

True
Source code in lasertram\helpers\batch.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def process_spot(
    spot,
    raw_data,
    bkgd,
    keep,
    int_std,
    omit=None,
    despike=False,
    output_report=True,
):
    """a function to incorporate all the methods of the `LaserTRAM` class
    so a spot can be processed in an efficient and compact way.

    Args:
        spot (LaserTRAM spot object): an empty `LaserTRAM` spot object to be processed
        raw_data (pandas DataFrame): the raw counts per second dataframe to be assigned to the spot. Shape is (m x n) where m is the number of cycles through the mass range
        bkgd (tuple): (start, stop) pair of values corresponding to the analysis time where the background signal starts and stops
        keep (tuple): (start, stop) pair of values correpsonding to the analysis time where the interval signal for concentrations starts and stops
        int_std (str): column name for the internal standard analyte (e.g., 29Si)
        omit (tuple): (start, stop) pair of values corresponding to the analysis time to be omitted from the `keep` interval. Defaults to None.
        despike (bool, optional): Whether or not to despike all analyte signals using the standard deviation filter from `LaserTRAM.despike_data()`. Defaults to False
        output_report (bool, optional): Whether or not to create a 1-row pandas DataFrame output report in the following format. Defaults to True.


    """
    # assign data to the spot
    spot.get_data(raw_data)
    # despike the data if desired
    if despike is True:
        spot.despike_data(analyte_list="all")
    # assign the internal standard analyte
    spot.assign_int_std(int_std)
    # assign intervals for background and ablation signal
    spot.assign_intervals(bkgd=bkgd, keep=keep, omit=omit)
    # assign and save the median background values
    spot.get_bkgd_data()
    # remove the median background values from the ablation interval
    spot.subtract_bkgd()
    # calculate detection limits based off background values
    spot.get_detection_limits()
    # normalize the ablation interval to the internal standard analyte,
    # get the median values, and the standard error
    spot.normalize_interval()

    if output_report is True:
        spot.make_output_report()

conversions module: For converting wt% oxide to ppm

oxide_to_ppm(wt_percent, int_std)

convert concentration internal standard analyte oxide in weight percent to concentration ppm for a 1D series of data

Args: wt_percent (array-like): the oxide values to be converted to ppm int_std (str): the internal standard used in the experiment (e.g., '29Si', '43Ca','47Ti')

Returns: ppm (array-like): concentrations in ppm the same shape as the wt_percent input

Source code in lasertram\helpers\conversions.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def oxide_to_ppm(wt_percent, int_std):
    """
    convert concentration internal standard analyte oxide in weight percent to
    concentration ppm for a 1D series of data

    Args:
    wt_percent (array-like): the oxide values to be converted to ppm
    int_std (str): the internal standard used in the experiment (e.g., '29Si', '43Ca','47Ti')

    Returns:
    ppm (array-like): concentrations in ppm the same shape as the wt_percent input

    """

    el = [i for i in int_std if not i.isdigit()]

    if len(el) == 2:
        element = el[0] + el[1]

    else:
        element = el[0]

    oxides = [
        "SiO2",
        "TiO2",
        "Al2O3",
        "Cr2O3",
        "MnO",
        "FeO",
        "K2O",
        "CaO",
        "Na2O",
        "NiO",
        "MgO",
    ]

    for o in oxides:
        if element in o:
            oxide = o

    s = oxide.split("O")
    cat_subscript = s[0]
    an_subscript = s[1]

    cat_subscript = [i for i in cat_subscript if i.isdigit()]
    if cat_subscript:
        cat_subscript = int(cat_subscript[0])
    else:
        cat_subscript = 1

    an_subscript = [i for i in an_subscript if i.isdigit()]
    if an_subscript:
        an_subscript = int(an_subscript[0])
    else:
        an_subscript = 1

    ppm = 1e4 * (
        (wt_percent * mendeleev.element(element).atomic_weight * cat_subscript)
        / (
            mendeleev.element(element).atomic_weight
            + mendeleev.element("O").atomic_weight * an_subscript
        )
    )
    return ppm

plot_lasertram_uncertainties(spot, fig=None, ax=None, **kwargs)

plot a bar chart of analyte uncertainties related to the output from processing using the LaserTRAM module

Parameters

spot : LaserTRAM.spot the LaserTRAM.spot object to plot the uncertainties for fig : matplotlib.Figure, optional The figure to apply the plot to, by default None ax : matplotlib.Axes, optional the axis to apply the plot to, by default None

Returns

ax

Source code in lasertram\helpers\plotting.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def plot_lasertram_uncertainties(spot, fig=None, ax=None, **kwargs):
    """plot a bar chart of analyte uncertainties related to the output from
    processing using the `LaserTRAM` module

    Parameters
    ----------
    spot : LaserTRAM.spot
        the `LaserTRAM.spot` object to plot the uncertainties for
    fig : matplotlib.Figure, optional
        The figure to apply the plot to, by default None
    ax : matplotlib.Axes, optional
        the axis to apply the plot to, by default None

    Returns
    -------
    ax
    """

    if fig is None:
        fig = plt.figure(figsize=(12, 3))
    else:
        fig = plt.gcf()

    if ax is None:
        ax = fig.add_subplot()

    ax.bar(x=spot.analytes, height=spot.bkgd_subtract_std_err_rel, **kwargs)

    labels = [analyte for analyte in spot.analytes]
    labels = [
        "$^{{{}}}${}".format(
            re.findall(r"\d+", label)[0],
            label.replace(re.findall(r"\d+", label)[0], ""),
        )
        for label in labels
    ]
    ax.set_xticks(ax.get_xticks())
    ax.set_xticklabels(labels, rotation=90)
    ax.set_ylabel("% SE")

    return ax

plot_timeseries_data(df, analytes='all', marker='', fig=None, ax=None, **kwargs)

Plot time-series data related to laser ablation ICP-MS analyses, typically where the x-axis is analysis time and y-axis is either counts per second data or data derived from it.

Parameters

df : pandas DataFrame the dataframe to be plotted analytes : str, optional list of columns to be plotted from the dataframe, by default 'all'. Meant to be utilized when the input dataframe is either a LaserTRAM spot object so columns reflect only 'Time' and analytes. marker : str, optional matplotlib marker to use for plotting symbol, by default '' fig : matplotlib.Figure, optional The figure to apply the plot to, by default None ax : matplotlib.Axes, optional the axis to apply the plot to, by default None

Returns

ax

Ex:

from lasertram import preprocessing, plotting, LaserTRAM
import matplotlib.pyplot as plt
plt.style.use("lasertram.lasertram")

raw_data  = preprocessing.load_test_rawdata()

sample = 'GSD-1G_-_1'

ax = plotting.plot_timeseries_data(raw_data.loc[sample,:])
ax[0].set_title(sample)
ax[0].set_ylabel("cps")
ax[0].set_xlabel("Time (ms)")
Source code in lasertram\helpers\plotting.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def plot_timeseries_data(
    df,
    analytes="all",
    marker="",
    fig=None,
    ax=None,
    **kwargs,
):
    """Plot time-series data related to laser ablation ICP-MS analyses,
        typically where the x-axis is analysis time and y-axis is either
        counts per second data or data derived from it.

    Parameters
    ----------
    df : pandas DataFrame
        the dataframe to be plotted
    analytes : str, optional
        list of columns to be plotted from the dataframe, by default 'all'.
        Meant to be utilized when the input dataframe is either a LaserTRAM spot
        object so columns reflect only 'Time' and analytes.
    marker : str, optional
        matplotlib marker to use for plotting symbol, by default ''
    fig : matplotlib.Figure, optional
        The figure to apply the plot to, by default None
    ax : matplotlib.Axes, optional
        the axis to apply the plot to, by default None

    Returns
    -------
    ax


    Ex:
    ```python
    from lasertram import preprocessing, plotting, LaserTRAM
    import matplotlib.pyplot as plt
    plt.style.use("lasertram.lasertram")

    raw_data  = preprocessing.load_test_rawdata()

    sample = 'GSD-1G_-_1'

    ax = plotting.plot_timeseries_data(raw_data.loc[sample,:])
    ax[0].set_title(sample)
    ax[0].set_ylabel("cps")
    ax[0].set_xlabel("Time (ms)")
    ```
    """

    if fig is None:
        fig = plt.figure(figsize=(8, 4))
    else:
        fig = plt.gcf()

    if ax is None:
        # setting up default axes
        rect = (0.1, 0.1, 0.8, 0.8)
        ax = [fig.add_axes(rect, label=f"{i}") for i in range(2)]

        horiz = [Size.AxesX(ax[0]), Size.Fixed(0.5), Size.AxesX(ax[1])]
        vert = [Size.AxesY(ax[0]), Size.Fixed(0.5), Size.AxesY(ax[1])]

        # divide the Axes rectangle into grid whose size is specified by horiz * vert
        divider = Divider(fig, rect, horiz, vert, aspect=False)
        ax[0].set_axes_locator(divider.new_locator(nx=0, ny=0))
        ax[1].set_axes_locator(divider.new_locator(nx=2, ny=0))

    if analytes == "all":
        analytes = [
            column
            for column in df.columns
            if ("timestamp" not in column) and ("Time" not in column)
        ]

        df.loc[:, ["Time"] + analytes].plot(
            x="Time",
            y=analytes,
            kind="line",
            marker=marker,
            ax=ax[0],
            lw=1,
            legend=False,
            **kwargs,
        )

    else:
        if isinstance(analytes, list):
            pass
        else:
            analytes = [analytes]

        df.loc[:, ["Time"] + analytes].plot(
            x="Time",
            y=analytes,
            kind="line",
            marker=marker,
            ax=ax[0],
            lw=1,
            legend=False,
            **kwargs,
        )

    ax[0].set_yscale("log")

    handles, labels = ax[0].get_legend_handles_labels()
    cols = 2
    ax[1].legend(
        handles, labels, loc="upper left", bbox_to_anchor=(0.15, 1.1), ncol=cols
    )
    ax[1].axis("off")

    return ax

extract_agilent_data(file)

read raw output from an Agilent quadrupole .csv file and return a pandas dataframe and metadata ready for processing with LaserTRAM

Parameters

file : path-like path to the csv file for data to be extracted

Returns

dict dictionary that contains timestamp, filename, and data for preprocessing

Source code in lasertram\helpers\preprocessing.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def extract_agilent_data(file):
    """
    read raw output from an Agilent quadrupole .csv file and
    return a pandas dataframe and metadata ready for processing with LaserTRAM

    Parameters
    ----------
    file : path-like
        path to the csv file for data to be extracted

    Returns
    -------
    dict
        dictionary that contains timestamp, filename, and data
        for preprocessing

    """
    # import data
    # extract sample name
    # extract timestamp
    # extract data and make headers ready for lasertram

    df = pd.read_csv(file, sep="\t", header=None)

    sample = df.iloc[0, 0].split("\\")[-1].split(".")[0].replace("_", "-")

    timestamp = parse(df.iloc[2, 0].split(" ")[7] + " " + df.iloc[2, 0].split(" ")[8])

    data = pd.DataFrame([sub.split(",") for sub in df.iloc[3:-1, 0]])

    header = data.iloc[0, :]
    data = data[1:]
    data.columns = header
    newcols = []
    for s in data.columns.tolist():
        l = re.findall("(\d+|[A-Za-z]+)", s)
        if "Time" in l:
            newcols.append(l[0])
        else:

            newcols.append(l[1] + l[0])
    data.columns = newcols

    return {"timestamp": timestamp, "file": file, "sample": sample, "data": data}

extract_thermo_data(file)

read raw output from an ThermoFisher quadrupole .csv file and return a pandas dataframe and metadata ready for processing with LaserTRAM

Parameters

file : path-like path to the csv file for data to be extracted

Returns

dict dictionary that contains timestamp, filename, and data for preprocessing

Source code in lasertram\helpers\preprocessing.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def extract_thermo_data(file):
    """
    read raw output from an ThermoFisher quadrupole .csv file and
    return a pandas dataframe and metadata ready for processing with LaserTRAM

    Parameters
    ----------
    file : path-like
        path to the csv file for data to be extracted

    Returns
    -------
    dict
        dictionary that contains timestamp, filename, and data
        for preprocessing
    """

    # gets the top row in your csv and turns it into a pandas series
    top = pd.read_csv(file, nrows=0)
    # since it is only 1 long it is also the column name
    # extract that as a list
    sample = list(top.columns)

    # turn that list value to a string
    sample = str(sample[0])

    # because its a string it can be split
    # split at : removes the time stamp
    sample = sample.split(":")[0]

    # .strip() removes leading and trailing spaces
    sample = sample.strip()

    # replace middle spaces with _ because spaces are bad
    nospace = sample.replace(" ", "_")

    # get the timestamp by splitting the string by the previously
    # designated sample. Also drops the colon in front of the date
    timestamp = top.columns.tolist()[0].split(sample)[1:][0][1:]

    timestamp = parse(timestamp)

    # import data
    # remove the top rows. Double check that your header is the specified
    # amount of rows to be skipped in 'skiprows' argument
    data = pd.read_csv(file, skiprows=13)
    # drop empty column at the end
    data.drop(data.columns[len(data.columns) - 1], axis=1, inplace=True)

    # remove dwell time row beneath header row
    data = data.dropna()

    return {"timestamp": timestamp, "file": file, "sample": nospace, "data": data}

load_test_int_std_comps()

Load in internal standard comps used as examples in the following manuscript:

Lubbers, J., Kent, A., Russo, C. (2025) "lasertram: a Python library for time resolved analysis of laser ablation inductively coupled plasma mass spectrometry data "

Source code in lasertram\helpers\preprocessing.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def load_test_int_std_comps():
    """
    Load in internal standard comps used as examples in the following manuscript:

    Lubbers, J., Kent, A., Russo, C. (2025) "lasertram: a Python
    library for time resolved analysis of laser ablation inductively
    coupled plasma mass spectrometry data "

    """

    current_path = Path(__file__).parent

    concentrations = pd.read_excel(
        current_path.parents[1]
        / "test_data"
        / "computers_and_geosciences_examples"
        / "example_internal_std.xlsx"
    )

    return concentrations

load_test_intervals()

Load in interval regions used as examples in the following manuscript:

Lubbers, J., Kent, A., Russo, C. (2025) "lasertram: a Python library for time resolved analysis of laser ablation inductively coupled plasma mass spectrometry data "

Source code in lasertram\helpers\preprocessing.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def load_test_intervals():
    """
    Load in interval regions used as examples in the following manuscript:

    Lubbers, J., Kent, A., Russo, C. (2025) "lasertram: a Python
    library for time resolved analysis of laser ablation inductively
    coupled plasma mass spectrometry data "

    """

    current_path = Path(__file__).parent

    intervals = pd.read_excel(
        current_path.parents[1]
        / "test_data"
        / "computers_and_geosciences_examples"
        / "example_intervals.xlsx"
    ).set_index("Spot")

    return intervals

load_test_rawdata()

Load in raw data used as examples in the following manuscript:

Lubbers, J., Kent, A., Russo, C. (2025) "lasertram: a Python library for time resolved analysis of laser ablation inductively coupled plasma mass spectrometry data "

Source code in lasertram\helpers\preprocessing.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def load_test_rawdata():
    """
    Load in raw data used as examples in the following manuscript:

    Lubbers, J., Kent, A., Russo, C. (2025) "lasertram: a Python
    library for time resolved analysis of laser ablation inductively
    coupled plasma mass spectrometry data "

    """

    current_path = Path(__file__).parent

    lt_ready = pd.read_excel(
        current_path.parents[1]
        / "test_data"
        / "computers_and_geosciences_examples"
        / "2022-05-10_LT_ready.xlsx"
    ).set_index("SampleLabel")

    return lt_ready

make_lt_ready_file(file, quad_type)

Take an individual csv file from either an Agilent or ThermoFisher quadrupole mass spectrometer and convert it to a pandas.DataFrame object ready for processing in LaserTRAM

Parameters

folder : path-like path to the csv file. quad_type : str "agilent" or "thermo"

Returns

pandas.DataFrame dataframe ready to be processed using LaserTRAM.

Source code in lasertram\helpers\preprocessing.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def make_lt_ready_file(file, quad_type):
    """
    Take an individual csv file from either an Agilent or ThermoFisher
    quadrupole mass spectrometer and convert it to a pandas.DataFrame
    object ready for processing in LaserTRAM

    Parameters
    ----------
    folder : path-like
        path to the csv file.
    quad_type : str
        "agilent" or "thermo"

    Returns
    -------
    pandas.DataFrame
        dataframe ready to be processed using LaserTRAM.
    """

    if isinstance(file, Path):
        pass
    else:
        file = Path(file)

    assert file.name.endswith(".csv"), f"File '{file}' does not have a CSV extension."

    if quad_type == "thermo":
        temp = extract_thermo_data(file)

    elif quad_type == "agilent":
        temp = extract_agilent_data(file)
    else:
        temp = None

    if temp:
        outdf = temp["data"]
        outdf.insert(0, "SampleLabel", temp["sample"])
        outdf.insert(0, "timestamp", temp["timestamp"])

    else:
        raise ValueError("please choose either 'thermo' or 'agilent' for quad_type")

    return outdf

make_lt_ready_folder(folder, quad_type)

Take a folder of csv files from either an Agilent or ThermoFisher quadrupole mass spectrometer, and combine their data such that it is a pandas.DataFrame ready for processing in LaserTRAM

Parameters

folder : path-like path to the folder where the csv files are. This looks at all csv files so make sure ONLY the data are in there. quad_type : str "agilent" or "thermo"

Returns

pandas.DataFrame dataframe ready to be processed using LaserTRAM.

Source code in lasertram\helpers\preprocessing.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def make_lt_ready_folder(folder, quad_type):
    """
    Take a folder of csv files from either an Agilent or ThermoFisher
    quadrupole mass spectrometer, and combine their data such that it is
    a pandas.DataFrame ready for processing in LaserTRAM

    Parameters
    ----------
    folder : path-like
        path to the folder where the csv files are. This looks at all csv
        files so make sure ONLY the data are in there.
    quad_type : str
        "agilent" or "thermo"

    Returns
    -------
    pandas.DataFrame
        dataframe ready to be processed using LaserTRAM.
    """

    if isinstance(folder, Path):
        pass
    else:
        folder = Path(folder)
    assert (
        folder.is_dir()
    ), f"{folder} is not a directory, please choose a directory to your data .csv files"
    my_dict = {}
    for i in folder.glob("*.csv"):
        if quad_type == "thermo":
            temp = extract_thermo_data(i)

        elif quad_type == "agilent":
            temp = extract_agilent_data(i)

        my_dict[temp["timestamp"]] = temp

    my_dict = dict(sorted(my_dict.items()))

    outdf = pd.DataFrame()
    for timestamp in my_dict:
        samplelabel = pd.DataFrame(
            np.repeat(
                my_dict[timestamp]["sample"], my_dict[timestamp]["data"].shape[0]
            ),
            columns=["SampleLabel"],
            index=my_dict[timestamp]["data"].index,
        )
        ts = pd.DataFrame(
            np.repeat(
                my_dict[timestamp]["timestamp"], my_dict[timestamp]["data"].shape[0]
            ),
            columns=["timestamp"],
            index=my_dict[timestamp]["data"].index,
        )
        df = pd.concat([ts, samplelabel, my_dict[timestamp]["data"]], axis="columns")

        outdf = pd.concat([outdf, df])
        outdf.index = np.arange(outdf.shape[0], dtype=int)

    return outdf