Example: Patching L1 Files

This notebook demonstrates how to patch a set of L1 files from specific sources using the Time Series Database.

[ ]:
import os
from pathlib import Path
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
from database.modules.utils.tsdb import TSDB
from kpfpipe.models.level0 import KPF0
from kpfpipe.models.level1 import KPF1
from kpfpipe.models.level2 import KPF2
from modules.Utils.kpf_parse import HeaderParse, get_datecode
%matplotlib inline

Select observations of the following types in the following date range.

[ ]:
start_date = datetime(2024, 9, 1)
end_date   = datetime(2024, 11, 1)
keep = ['Etalon', 'Star', 'LFC', 'Sun']

Get a dataframe with the corresponding observations.

[ ]:
cols = ['ObsID', 'Source', 'L0_filename', 'L1_filename', 'L2_filename']
myDB = TSDB(backend='psql')
df = myDB.dataframe_from_db(columns=cols, start_date=start_date, end_date=end_date)
df = df[df['Source'].isin(keep)].copy()

Define the patching operation (computing the READSPED keyword saving it to each L1 file) and a method to parallelize execution.

[ ]:
def patch_one_file(obsid: str):
    """
    Patch one L1 file by adding the READSPED keyword.
    """
    try:
        datecode = get_datecode(obsid)
        L1_fn    = f"/data/L1/{datecode}/{obsid}_L1.fits"
        L1 = KPF1.from_fits(L1_fn)
        header = HeaderParse(L1, "PRIMARY")
        readsped = header.get_read_speed()
        L1.header["PRIMARY"]["READSPED"] = (readsped[0], "Categorization of read speed")
        L1.to_fits(str(L1_fn))

        return (obsid, None)
    except Exception as e:
        return (obsid, str(e))

def run_parallel(df, n_workers=None, chunksize=16):
    n_workers = n_workers or max(1, cpu_count() // 2)  # tune for your box
    obsids = df["ObsID"].astype(str).to_numpy()

    errors = 0
    with Pool(processes=n_workers) as pool:
        for obsid, err in tqdm(
            pool.imap_unordered(patch_one_file, obsids, chunksize=chunksize),
            total=len(obsids),
            desc="Processing L1 files",
        ):
            if err:
                errors += 1
                tqdm.write(f"ERROR [{obsid}]: {err}")

    print(f"Completed with {errors} error(s)." if errors else "Completed without errors.")

Change the line below to if True: to patch files of the specified types in the specified data range.

[ ]:
if False:
    run_parallel(df, n_workers=16, chunksize=16)