Skip to content

agilent

Agilent binary parser adapted from Matlab code found at: https://github.com/chemplexity/chromatography/blob/master/Methods/Import/ImportAgilent.m

parse_agilent_ch

parse_agilent_ch(
    file_path, file_name=None, channel_name=None
) -> Chromatogram

Parses Agilent .ch files to a Chromatogram object. Supports versions 8, 81, 179, 181. Args: file_path (str | Path | file-like): Path to the .ch file or file-like object from zipfile. file_name (str, optional): Name of the file when file_path is a file-like object. channel_name (str, optional): Override channel name (otherwise extracted from filename). Returns: Chromatogram: Parsed chromatogram object.

Source code in src/chromstream/parsers/agilent.py
def parse_agilent_ch(file_path, file_name=None, channel_name=None) -> Chromatogram:
    """
    Parses Agilent .ch files to a Chromatogram object.
    Supports versions 8, 81, 179, 181.
    Args:
        file_path (str | Path | file-like): Path to the .ch file or file-like object from zipfile.
        file_name (str, optional): Name of the file when file_path is a file-like object.
        channel_name (str, optional): Override channel name (otherwise extracted from filename).
    Returns:
        Chromatogram: Parsed chromatogram object.
    """
    f, channel, path_str, should_close = _prepare_file_input(
        file_path, file_name, channel_name
    )

    metadata = {}

    try:
        # Read version string
        version = _read_pascal_string(f, "latin-1")
        metadata["version"] = version

        tic = np.array([])
        xmin = 0.0
        xmax = 0.0

        if version in ["8", "80"]:
            offsets = {
                "Sample Name": 24,
                "Sample Description": 86,
                "Method Name": 228,
                "Operator": 148,
                "date": 178,
                "Instrument": 218,
                "Inlet": 208,
                "Signal Unit": 580,
            }
            encoding = "latin-1"

            f.seek(264)
            sig_offset_raw = struct.unpack(">i", f.read(4))[0]
            sig_offset = (sig_offset_raw - 1) * 512

            for key, off in offsets.items():
                f.seek(off)
                metadata[key] = _read_pascal_string(f, encoding)

            tic = _delta_compression(f, sig_offset)

            f.seek(282)
            xmin = struct.unpack(">i", f.read(4))[0] / 60000.0
            xmax = struct.unpack(">i", f.read(4))[0] / 60000.0

            f.seek(542)
            header = struct.unpack(">i", f.read(4))[0]
            if header in [1, 2, 3]:
                tic = tic * 1.33321110047553
            else:
                f.seek(636)
                intercept = struct.unpack(">d", f.read(8))[0]
                slope = struct.unpack(">d", f.read(8))[0]
                tic = tic * slope + intercept

        elif version == "81":
            offsets = {
                "Sample Name": 24,
                "Sample Description": 86,
                "Method Name": 228,
                "Operator": 148,
                "date": 178,
                "Instrument": 218,
                "Inlet": 208,
                "Signal Unit": 580,
            }
            encoding = "latin-1"

            f.seek(264)
            sig_offset_raw = struct.unpack(">i", f.read(4))[0]
            sig_offset = (sig_offset_raw - 1) * 512

            for key, off in offsets.items():
                f.seek(off)
                metadata[key] = _read_pascal_string(f, encoding)

            tic = _double_delta_compression(f, sig_offset)

            f.seek(282)
            xmin = struct.unpack(">f", f.read(4))[0] / 60000.0
            xmax = struct.unpack(">f", f.read(4))[0] / 60000.0

            f.seek(636)
            intercept = struct.unpack(">d", f.read(8))[0]
            slope = struct.unpack(">d", f.read(8))[0]
            tic = tic * slope + intercept

        elif version in ["179", "181"]:
            offsets = {
                "Sample Name": 858,
                "Sample Description": 1369,
                "Method Name": 2574,
                "Operator": 1880,
                "date": 2391,
                "Instrument": 2533,
                "Inlet": 2492,
                "Signal Unit": 4172,
            }
            encoding = "utf-16-le"

            f.seek(264)
            sig_offset_raw = struct.unpack(">i", f.read(4))[0]
            sig_offset = (sig_offset_raw - 1) * 512

            for key, off in offsets.items():
                f.seek(off)
                metadata[key] = _read_pascal_string(f, encoding)

            f.seek(282)
            xmin = struct.unpack(">f", f.read(4))[0] / 60000.0
            xmax = struct.unpack(">f", f.read(4))[0] / 60000.0

            f.seek(4724)
            intercept = struct.unpack(">d", f.read(8))[0]
            slope = struct.unpack(">d", f.read(8))[0]

            if version == "179":
                tic = _double_array(f, sig_offset)
            else:
                tic = _double_delta_compression(f, sig_offset)

            tic = tic * slope + intercept

        else:
            raise ValueError(f"Unsupported Agilent version: {version}")

    finally:
        if should_close:
            f.close()

    # Create Time Array
    if len(tic) > 1:
        time = np.linspace(xmin, xmax, len(tic))
    else:
        time = np.array([])

    # Build DataFrame
    df = pd.DataFrame({"Time": time, "Signal": tic})

    # Parse Date
    injection_time = _parse_date(metadata.get("date"))
    if pd.isna(injection_time):
        raise ValueError(f"Invalid injection time parsed from {path_str}")

    # Ensure time_unit
    if "time_unit" not in metadata:
        metadata["time_unit"] = "min"

    return Chromatogram(
        data=df,
        injection_time=injection_time,
        metadata=metadata,
        channel=channel,
        path=path_str,
    )

parse_agilent_dot_d

parse_agilent_dot_d(path_dir: Path) -> list[Chromatogram]

Given a path to a Chromeleon .d directory, parses all chromatogram files and returns a list of Chromatogram objects.

Parameters:

  • path_dir (str | Path) –

    Path to the .d directory.

Returns:

  • list[Chromatogram]

    list[Chromatogram]: List of parsed Chromatogram objects.

Source code in src/chromstream/parsers/agilent.py
def parse_agilent_dot_d(path_dir: Path) -> list[Chromatogram]:
    """
    Given a path to a Chromeleon .d directory, parses all chromatogram files
    and returns a list of Chromatogram objects.

    Args:
        path_dir (str | Path): Path to the .d directory.

    Returns:
        list[Chromatogram]: List of parsed Chromatogram objects.
    """
    # if the dir doesn't end with .d or is nto a directory, raise an error
    if not path_dir.is_dir() or not path_dir.name.lower().endswith(".d"):
        raise ValueError(f"Provided path is not a valid .d directory: {path_dir}")

    chrom_list = []
    for file in os.listdir(path_dir):
        if file.endswith(".ch"):
            chrom_path = path_dir / file
            chrom = parse_agilent_ch(chrom_path)
            chrom_list.append(chrom)
    if len(chrom_list) == 0:
        log.warning(f"No .ch files found in directory: {path_dir}")
    return chrom_list

parse_agilent_dx

parse_agilent_dx(file_path) -> list[Chromatogram]

Parses Agilent .dx files to a list of Chromatogram objects.

Parameters:

  • file_path (str | Path) –

    Path to the .dx file.

Returns:

  • list[Chromatogram]

    list[Chromatogram]: List of parsed Chromatogram objects.

Source code in src/chromstream/parsers/agilent.py
def parse_agilent_dx(file_path) -> list[Chromatogram]:
    """
    Parses Agilent .dx files to a list of Chromatogram objects.

    Args:
        file_path (str | Path): Path to the .dx file.

    Returns:
        list[Chromatogram]: List of parsed Chromatogram objects.
    """
    # check if file is a .dx file
    path = Path(file_path)
    if not path.exists() or not path.is_file() or not path.suffix.lower() == ".dx":
        raise ValueError(f"Provided path is not a valid .dx file: {file_path}")

    # trying to unzip
    with zipfile.ZipFile(path, "r") as dx_open:
        # Parse .acmd file to get channel names
        channel_map = _parse_acmd_channel_mapping(dx_open)

        chrom_list = []
        for file in dx_open.namelist():
            if file.lower().endswith(".ch"):
                with dx_open.open(file) as f:
                    # Try to match filename to channel name
                    # .ch files are typically named with TraceId
                    file_stem = Path(file).stem
                    channel_name = channel_map.get(file_stem) if channel_map else None

                    chrom = parse_agilent_ch(
                        f, file_name=file, channel_name=channel_name
                    )
                    chrom_list.append(chrom)

        if len(chrom_list) == 0:
            log.warning(f"No .ch files found in .dx archive: {file_path}")
        return chrom_list