Skip to content

Documentation for MassSpec.py

MassSpec

IndependentMassSpectrometer

IndependentMassSpectrometer(
    ionisation_mode,
    chemicals,
    mz_noise=None,
    intensity_noise=None,
    spike_noise=None,
    isolation_transition_window="rectangular",
    isolation_transition_window_params=None,
    scan_duration=DEFAULT_SCAN_TIME_DICT,
    task_manager=None,
    skip_ms2_spectra_generation=False,
)

A class that represents (synchronous) mass spectrometry process. Independent here refers to how the intensity of each peak in a scan is independent of each other i.e. there's no ion supression effect.

Creates a mass spec object.

Parameters:

Name Type Description Default
ionisation_mode

POSITIVE or NEGATIVE

required
chemicals

a list of Chemical objects in the dataset

required
mz_noise

noise to apply to m/z values of generated peaks. Should be an instance of vimms.Noise.NoPeakNoise or others that inherit from it.

None
intensity_noise

noise to apply to intensity values of generated peaks. Should be an instance of vimms.Noise.NoPeakNoise or others that inherit from it.

None
spike_noise

spike noise in the generated spectra. Should be either None, or set an instance of vimms.Noise.UniformSpikeNoise if needed.

None
isolation_transition_window

transition window for isolating peaks

'rectangular'
isolation_transition_window_params

parameters for isolation

None
scan_duration

a dictionary of scan time for each MS level, or an instance of vimms.ChemicalSamplers.ScanTimeSampler.

DEFAULT_SCAN_TIME_DICT
task_manager

an instance of [vimms.MassSpec.TaskManager] to manage tasks, or None.

None
Source code in vimms/MassSpec.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def __init__(
    self,
    ionisation_mode,
    chemicals,
    mz_noise=None,
    intensity_noise=None,
    spike_noise=None,
    isolation_transition_window="rectangular",
    isolation_transition_window_params=None,
    scan_duration=DEFAULT_SCAN_TIME_DICT,
    task_manager=None,
    skip_ms2_spectra_generation=False,
):
    """
    Creates a mass spec object.

    Args:
        ionisation_mode: POSITIVE or NEGATIVE
        chemicals: a list of Chemical objects in the dataset
        mz_noise: noise to apply to m/z values of generated peaks.
                  Should be an instance of [vimms.Noise.NoPeakNoise][] or
                  others that inherit from it.
        intensity_noise: noise to apply to intensity values of generated peaks.
                         Should be an instance of [vimms.Noise.NoPeakNoise][] or
                         others that inherit from it.
        spike_noise: spike noise in the generated spectra. Should be either None, or
                     set an instance of [vimms.Noise.UniformSpikeNoise][] if needed.
        isolation_transition_window: transition window for isolating peaks
        isolation_transition_window_params: parameters for isolation
        scan_duration: a dictionary of scan time for each MS level, or
                       an instance of [vimms.ChemicalSamplers.ScanTimeSampler][].
        task_manager: an instance of [vimms.MassSpec.TaskManager] to manage tasks, or None.
    """

    # current scan index and internal time
    self.idx = INITIAL_SCAN_ID  # same as the real mass spec
    self.time = 0

    # current task queue
    self.task_manager = task_manager if task_manager is not None else TaskManager()
    self.environment = None

    self.events = Events(
        (
            self.MS_SCAN_ARRIVED,
            self.ACQUISITION_STREAM_OPENING,
            self.ACQUISITION_STREAM_CLOSED,
            self.STATE_CHANGED,
        )
    )
    self.event_dict = {
        self.MS_SCAN_ARRIVED: self.events.MsScanArrived,
        self.ACQUISITION_STREAM_OPENING: self.events.AcquisitionStreamOpening,  # noqa
        self.ACQUISITION_STREAM_CLOSED: self.events.AcquisitionStreamClosing,  # noqa
        self.STATE_CHANGED: self.events.StateChanged,
    }

    # the list of all chemicals in the dataset
    self.chemicals = ChemSet.to_chemset(chemicals, fast=True)
    self.ionisation_mode = ionisation_mode
    self.chem_data_collector = ChemDataCollector(self.ionisation_mode)

    # whether to add noise to the generated peaks, the default is no noise
    self.mz_noise = mz_noise
    self.intensity_noise = intensity_noise
    if self.mz_noise is None:
        self.mz_noise = NoPeakNoise()
    if self.intensity_noise is None:
        self.intensity_noise = NoPeakNoise()
    self.spike_noise = spike_noise

    self.fragmentation_events = []  # which chemicals produce which peaks

    self.isolation_transition_window = isolation_transition_window
    self.isolation_transition_window_params = isolation_transition_window_params  # noqa

    self.scan_duration_dict = scan_duration
    self.skip_ms2_spectra_generation = skip_ms2_spectra_generation

clear_event

clear_event(event_name)

Clears event handler for a given event name

Parameters:

Name Type Description Default
event_name

the event name

required

Returns: None

Source code in vimms/MassSpec.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
def clear_event(self, event_name):
    """
    Clears event handler for a given event name

    Args:
        event_name: the event name

    Returns: None

    """
    if event_name not in self.event_dict:
        raise ValueError("Unknown event name")
    e = self.event_dict[event_name]
    e.targets = []

clear_events

clear_events()

Clear event handlers

Returns: None

Source code in vimms/MassSpec.py
549
550
551
552
553
554
555
556
557
def clear_events(self):
    """
    Clear event handlers

    Returns: None

    """
    for key in self.event_dict:
        self.clear_event(key)

close

close()

Close this mass spec

Returns: None

Source code in vimms/MassSpec.py
574
575
576
577
578
579
580
581
def close(self):
    """
    Close this mass spec

    Returns: None

    """
    self.clear_events()

dispatch_scan

dispatch_scan(scan)

Notify the controller that a new scan has been generated at this point, the MS_SCAN_ARRIVED event handler in the controller is called and the processing queue will be updated with new sets of scan parameters to do.

Parameters:

Name Type Description Default
scan

a newly generated scan.

required

Returns: None

Source code in vimms/MassSpec.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def dispatch_scan(self, scan):
    """
    Notify the controller that a new scan has been generated
    at this point, the MS_SCAN_ARRIVED event handler in the
    controller is called and the processing queue will be updated
    with new sets of scan parameters to do.

    Args:
        scan: a newly generated scan.

    Returns: None

    """
    self.fire_event(self.MS_SCAN_ARRIVED, scan)

fire_event

fire_event(event_name, arg=None)

Simulates sending an event

Parameters:

Name Type Description Default
event_name

the event name

required
arg

the event parameter

None

Returns: None

Source code in vimms/MassSpec.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
def fire_event(self, event_name, arg=None):
    """
    Simulates sending an event

    Args:
        event_name: the event name
        arg: the event parameter

    Returns: None

    """
    if event_name not in self.event_dict:
        raise ValueError("Unknown event name")

    # pretend to fire the event
    # actually here we just runs the event handler method directly
    e = self.event_dict[event_name]
    if arg is not None:
        e(arg)
    else:
        e()

get_params

get_params()

Retrieves a new set of scan parameters from the processing queue

A new set of scan parameters from the queue if available,

Type Description

otherwise it returns nothing (default scan set in actual MS)

Source code in vimms/MassSpec.py
484
485
486
487
488
489
490
491
492
493
def get_params(self):
    """
    Retrieves a new set of scan parameters from the processing queue

    Returns: A new set of scan parameters from the queue if available,
             otherwise it returns nothing (default scan set in actual MS)

    """
    params_list = self.task_manager.to_send()
    return params_list

register_event

register_event(event_name, handler)

Register event handler

Parameters:

Name Type Description Default
event_name

the event name

required
handler

the event handler

required

Returns: None

Source code in vimms/MassSpec.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
def register_event(self, event_name, handler):
    """
    Register event handler

    Args:
        event_name: the event name
        handler: the event handler

    Returns: None

    """
    if event_name not in self.event_dict:
        raise ValueError("Unknown event name")
    e = self.event_dict[event_name]
    e += handler  # register a new event handler for e

send_params

send_params(params_list)

Send parameters to the instrument.

In the real IAPI mass spec, we would send these params to the instrument, but here we just store them in the list of pending tasks to be processed later

Parameters:

Name Type Description Default
params_list

the list of scan parameters to send.

required

Returns: None

Source code in vimms/MassSpec.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def send_params(self, params_list):
    """
    Send parameters to the instrument.

    In the real IAPI mass spec, we would send these params
    to the instrument, but here we just store them in the list of pending tasks
    to be processed later

    Args:
        params_list: the list of scan parameters to send.

    Returns: None

    """
    self.task_manager.add_pending(params_list)

step

step(params=None, call_controller=True)

Performs one step of a mass spectrometry process

Parameters:

Name Type Description Default
params

initial set of tasks from the controller

None
call_controller

whether to actually call the controller or not

True

Returns: a newly generated scan

Source code in vimms/MassSpec.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def step(self, params=None, call_controller=True):
    """
    Performs one step of a mass spectrometry process

    Args:
        params: initial set of tasks from the controller
        call_controller: whether to actually call the controller or not

    Returns: a newly generated scan

    """
    # if no params passed in, then try to get one from the queue
    if params is None:
        # Get the next set of params from the outgoing tasks.
        # This could return an empty list if there's nothing there.
        params_list = self.get_params()
    else:
        params_list = [params]  # initial param passed from the controller

    # Send params away. In the simulated case, no sending actually occurs,
    # instead we just track these params we've sent by adding them to
    # self.environment.pending_tasks
    if len(params_list) > 0:
        self.send_params(params_list)

    # pick up the last param that has been sent and generate a new scan
    new_scan = None
    if self.task_manager.pending_size() > 0:
        params = self.task_manager.pop_pending()

        # Make scan using the param that has been sent
        # Note that in the real mass spec, the scan generation
        # likely won't happen immediately after the param is sent.
        new_scan = self._get_scan(self.time, params)

        new_scan.scan_duration = self._increase_time(new_scan.ms_level, new_scan.rt, None)
        # dispatch the generated scan to controller
        if call_controller:
            self.dispatch_scan(new_scan)
    return new_scan

Scan

Scan(
    scan_id,
    mzs,
    intensities,
    ms_level,
    rt,
    scan_duration=None,
    scan_params=None,
    parent=None,
    fragevent=None,
)

A class to store scan information

Creates a scan

Parameters:

Name Type Description Default
scan_id

current scan id

required
mzs

an array of mz values

required
intensities

an array of intensity values

required
ms_level

the ms level of this scan

required
rt

the starting retention time of this scan

required
scan_duration

how long this scan takes, if known.

None
scan_params

the parameters used to generate this scan, if known

None
parent

parent precursor peak, if known

None
fragevent

fragmentation event associated to this scan, if any

None
Source code in vimms/MassSpec.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def __init__(
    self,
    scan_id,
    mzs,
    intensities,
    ms_level,
    rt,
    scan_duration=None,
    scan_params=None,
    parent=None,
    fragevent=None,
):
    """
    Creates a scan

    Args:
        scan_id: current scan id
        mzs: an array of mz values
        intensities: an array of intensity values
        ms_level: the ms level of this scan
        rt: the starting retention time of this scan
        scan_duration: how long this scan takes, if known.
        scan_params: the parameters used to generate this scan, if known
        parent: parent precursor peak, if known
        fragevent: fragmentation event associated to this scan, if any
    """
    assert len(mzs) == len(intensities)
    self.scan_id = scan_id

    # ensure that mzs and intensites are sorted by their mz values
    p = mzs.argsort()
    self.mzs = mzs[p]
    self.intensities = intensities[p]

    self.ms_level = ms_level
    self.rt = rt
    self.num_peaks = len(mzs)

    self.scan_duration = scan_duration
    self.scan_params = scan_params
    self.parent = parent
    self.fragevent = fragevent

ScanEvent

ScanEvent(
    chem,
    query_rt,
    ms_level,
    peaks,
    scan_id,
    parents_intensity=None,
    parent_adduct=None,
    parent_isotope=None,
    precursor_mz=None,
    isolation_window=None,
    scan_params=None,
)

A class to store fragmentation events. Mostly used for benchmarking purpose

Creates a fragmentation event

Parameters:

Name Type Description Default
chem

the chemical that were fragmented

required
query_rt

the time when fragmentation occurs

required
ms_level

MS level of fragmentation

required
peaks

the set of peaks produced during the fragmentation event

required
scan_id

the scan id linked to this fragmentation event

required
parents_intensity

the intensity of the chemical that was fragmented at the time it was fragmented

None
parent_adduct

the adduct that was fragmented of the chemical

None
parent_isotope

the isotope that was fragmented of the chemical

None
precursor_mz

the precursor mz of the scan

None
isolation_window

the isolation window of the scan

None
scan_params

the scan parameter settings that were used

None
Source code in vimms/MassSpec.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def __init__(
    self,
    chem,
    query_rt,
    ms_level,
    peaks,
    scan_id,
    parents_intensity=None,
    parent_adduct=None,
    parent_isotope=None,
    precursor_mz=None,
    isolation_window=None,
    scan_params=None,
):
    """
    Creates a fragmentation event

    Args:
        chem: the chemical that were fragmented
        query_rt: the time when fragmentation occurs
        ms_level: MS level of fragmentation
        peaks: the set of peaks produced during the fragmentation event
        scan_id: the scan id linked to this fragmentation event
        parents_intensity: the intensity of the chemical that was
                           fragmented at the time it was fragmented
        parent_adduct: the adduct that was fragmented of the chemical
        parent_isotope: the isotope that was fragmented of the chemical
        precursor_mz: the precursor mz of the scan
        isolation_window: the isolation window of the scan
        scan_params: the scan parameter settings that were used
    """
    self.chem = chem
    self.query_rt = query_rt
    self.ms_level = ms_level
    self.peaks = peaks
    self.scan_id = scan_id
    self.parents_intensity = parents_intensity  # only ms2
    self.parent_adduct = parent_adduct  # only ms2
    self.parent_isotope = parent_isotope  # only ms2
    self.precursor_mz = precursor_mz
    self.isolation_window = isolation_window
    self.scan_params = scan_params

ScanEventPeak

ScanEventPeak(mz, rt, intensity, ms_level)

A class to represent an empirical or sampled scan-level peak object

Creates a peak object

Parameters:

Name Type Description Default
mz

mass-to-charge value

required
rt

retention time value

required
intensity

intensity value

required
ms_level

MS level

required
Source code in vimms/MassSpec.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, mz, rt, intensity, ms_level):
    """
    Creates a peak object

    Args:
        mz: mass-to-charge value
        rt: retention time value
        intensity: intensity value
        ms_level: MS level
    """
    self.mz = mz
    self.rt = rt
    self.intensity = intensity
    self.ms_level = ms_level

TaskManager

TaskManager(buffer_size=5)

A class to track how many new tasks (scan commands) that we can send, given the buffer size of the mass spec.

Initialises the task manager.

At any point, this class will ensure that there is not more than this number of tasks enqueued on the mass spec, see https://github.com/thermofisherlsms/iapi/issues/22.

Parameters:

Name Type Description Default
buffer_size

maximum buffer or queue size on the mass spec.

5
Source code in vimms/MassSpec.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def __init__(self, buffer_size=5):
    """
    Initialises the task manager.

    At any point, this class will ensure that there is not more than this
    number of tasks enqueued on the mass spec, see
    https://github.com/thermofisherlsms/iapi/issues/22.

    Args:
        buffer_size: maximum buffer or queue size on the mass spec.
    """
    self.current_tasks = []
    self.pending_tasks = []
    self.buffer_size = buffer_size

add_current

add_current(tasks)

Add to the list of current tasks (ready to send)

Parameters:

Name Type Description Default
tasks

list of current tasks

required

Returns: None

Source code in vimms/MassSpec.py
211
212
213
214
215
216
217
218
219
220
221
def add_current(self, tasks):
    """
    Add to the list of current tasks (ready to send)

    Args:
        tasks: list of current tasks

    Returns: None

    """
    self.current_tasks.extend(tasks)

add_pending

add_pending(tasks)

Add to the list of pending tasks (sent but not received)

Parameters:

Name Type Description Default
tasks

list of pending tasks

required

Returns: None

Source code in vimms/MassSpec.py
223
224
225
226
227
228
229
230
231
232
233
def add_pending(self, tasks):
    """
    Add to the list of pending tasks (sent but not received)

    Args:
        tasks: list of pending tasks

    Returns: None

    """
    self.pending_tasks.extend(tasks)

current_size

current_size()

Get the size of current tasks

Returns: the size of current tasks

Source code in vimms/MassSpec.py
297
298
299
300
301
302
303
304
def current_size(self):
    """
    Get the size of current tasks

    Returns: the size of current tasks

    """
    return len(self.current_tasks)

peek_current

peek_current()

Get the first current task (ready to send) without removing it

Returns: a current task

Source code in vimms/MassSpec.py
279
280
281
282
283
284
285
286
def peek_current(self):
    """
    Get the first current task (ready to send) without removing it

    Returns: a current task

    """
    return self.current_tasks[0]

peek_pending

peek_pending()

Get the first pending task (sent but not received) without removing it

Returns: a pending task

Source code in vimms/MassSpec.py
288
289
290
291
292
293
294
295
def peek_pending(self):
    """
    Get the first pending task (sent but not received) without removing it

    Returns: a pending task

    """
    return self.pending_tasks[0]

pending_size

pending_size()

Get the size of pending tasks

Returns: the size of pending tasks

Source code in vimms/MassSpec.py
306
307
308
309
310
311
312
313
def pending_size(self):
    """
    Get the size of pending tasks

    Returns: the size of pending tasks

    """
    return len(self.pending_tasks)

pop_current

pop_current()

Remove the first current task (ready to send)

Returns: a current task

Source code in vimms/MassSpec.py
261
262
263
264
265
266
267
268
def pop_current(self):
    """
    Remove the first current task (ready to send)

    Returns: a current task

    """
    return self.current_tasks.pop(0)

pop_pending

pop_pending()

Remove the first pending task (sent but not received)

Returns: a pending task

Source code in vimms/MassSpec.py
270
271
272
273
274
275
276
277
def pop_pending(self):
    """
    Remove the first pending task (sent but not received)

    Returns: a pending task

    """
    return self.pending_tasks.pop(0)

remove_pending

remove_pending(completed_task)

Remove a completed task from the list of pending tasks

Parameters:

Name Type Description Default
completed_task

a newly completed task

required

Returns: None

Source code in vimms/MassSpec.py
235
236
237
238
239
240
241
242
243
244
245
def remove_pending(self, completed_task):
    """
    Remove a completed task from the list of pending tasks

    Args:
        completed_task: a newly completed task

    Returns: None

    """
    self.pending_tasks = [t for t in self.pending_tasks if t != completed_task]

to_send

to_send()

Select current tasks that could be sent to the mass spec, ensuring that buffer_size on the mass spec is not exceeded.

Returns: None

Source code in vimms/MassSpec.py
247
248
249
250
251
252
253
254
255
256
257
258
259
def to_send(self):
    """
    Select current tasks that could be sent to the mass spec,
    ensuring that buffer_size on the mass spec is not exceeded.

    Returns: None

    """
    batch_size = self.buffer_size - self.pending_size()
    batch = []
    while self.current_size() > 0 and len(batch) < batch_size:
        batch.append(self.pop_current())
    return batch