Skip to content

Commit

Permalink
modify filenames to follow bid convention and create necessary metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Shakiba Moradi authored and Shakiba Moradi committed Jan 9, 2025
1 parent ae5fccf commit fa686e9
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 1 deletion.
89 changes: 89 additions & 0 deletions scripts/create_bids_structure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""
This script automates the conversion of an fNIRS dataset into a BIDS-compliant format.
Main Functions:
---------------
1. Parse a CSV mapping file (`mapping_csv`) to retrieve file relationships and metadata.
2. Organize and rename SNIRF files into a BIDS-compatible structure.
3. Generate required BIDS files such as `scans.tsv`, coordinate system files, and `dataset_description.json`.
4. Handle optional extra metadata for inclusion in the BIDS dataset.
Arguments:
----------
- mapping_csv: str
The path to a CSV file containing the mapping of SNIRF files to BIDS-compatible filenames.
- dataset_path: str
The directory containing the input dataset.
- --extra_meta_data: str (optional)
A JSON file containing additional metadata to be added to the BIDS dataset.
Key Steps:
----------
1. Create the `bids` directory within the specified dataset path if it does not exist.
2. Parse and process existing `*_scans.tsv` files to merge acquisition times into the mapping.
3. Generate standardized BIDS filenames for the SNIRF files using `create_bids_standard_filenames`.
4. Rename and copy SNIRF files into the appropriate BIDS directory structure.
5. Validate and recursively populate the BIDS structure using `snirf2bids`.
7. Add optional metadata from the `extra_meta_data` argument to the dataset description.
Example Usage:
--------------
python create_bids.py mapping.csv dataset_directory --extra_meta_data extra_metadata.json
"""

import os
import pandas as pd
from cedalion.io.bids import create_bids_standard_filenames, copy_rename_snirf, create_scan_files, find_files_with_pattern, create_data_description, check_coord_files
import snirf2bids as s2b
import argparse

# Set up argument parser
parser = argparse.ArgumentParser(description="Create BIDS dataset.")

# Define expected arguments
parser.add_argument("mapping_csv", type=str, help="The output of dataset parser")
parser.add_argument("dataset_path", type=str, help="your dataset directory")
parser.add_argument("--extra_meta_data", type=str, help="your extra metadata")
# Parse the arguments
args = parser.parse_args()

snirf2bids_mapping_df_path = args.mapping_csv

dataset_path = args.dataset_path


bids_dir = os.path.join(dataset_path, "bids")
if not os.path.exists(bids_dir):
os.makedirs(bids_dir)

snirf2bids_mapping_df = pd.read_csv(snirf2bids_mapping_df_path, dtype=str)
snirf2bids_mapping_df["record_name"] = snirf2bids_mapping_df["current_name"].apply(lambda x: os.path.basename(x))

scan_paths = find_files_with_pattern(dataset_path, "*_scans.tsv")
scan_dfs = [pd.read_csv(file, sep='\t') for file in scan_paths]
if len(scan_dfs) != 0:
scan_df = pd.concat(scan_dfs, ignore_index=True)
scan_df.drop_duplicates(subset="filename", inplace=True)
scan_df["filename"] = scan_df["filename"].apply(lambda x: str(os.path.basename(x)).replace(".snirf", ""))
scan_df = scan_df.rename(columns={'filename': 'record_name'})

snirf2bids_mapping_df = pd.merge(snirf2bids_mapping_df, scan_df, on="record_name", how="left")
else:
snirf2bids_mapping_df["acq_time"] = None

snirf2bids_mapping_df[["bids_name", "parent_path"]] = snirf2bids_mapping_df.apply(create_bids_standard_filenames, axis=1, result_type='expand')

snirf2bids_mapping_df["status"] = snirf2bids_mapping_df.apply(copy_rename_snirf, axis=1, args=(dataset_path, bids_dir))

s2b.snirf2bids_recurse(bids_dir)

scan_df = snirf2bids_mapping_df[snirf2bids_mapping_df['status'] != "removed"]
scan_df = scan_df[["sub", "ses", "bids_name", "acq_time"]]
scan_df = scan_df.groupby(["sub", "ses"])
scan_df.apply(lambda group: create_scan_files(group, bids_dir))

extra_meta_data = args.extra_meta_data
create_data_description(dataset_path, bids_dir, extra_meta_data)

check_coord_files(bids_dir)

104 changes: 104 additions & 0 deletions scripts/parse_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
command line tool
- given a directory read in all the snirf file names
- use the information to populate a csv file that contains as much snirf info as possible
- use then fills in any missing/incorrect information
1. take in a directory path - use input
2. os walks through the directory and finds all snirf files
3. checks the names for any BIDS fields that are relevant - check both parent folder and file name
4. create a dictionary for each file with the BIDS fields filled in - if not present in the current filename use None
5. creates a tsv file from a template that has all the desired fields that would then be pulled to generate the BIDS dataset
6. saves this tsv file in the dataset directory
@author: lauracarlton
"""
import os
import pandas as pd
from cedalion.io.bids import check_for_bids_field

column_names = ["current_name",
"sub",
"ses",
"task",
"run",
"acq" ]


snirf2bids_mapping_df = pd.DataFrame(columns=column_names)


# dataset_path = input('Please enter the path to your dataset: ')
dataset_path = '/Users/shakiba/Downloads/snirf2bids_data/sudan_cocktail party/Cocktail_party_whole_head_data/'

#% IDENTIFY ALL SNIRF FILES IN THE DIRECTORY AND THEIR PATH

file_list = []
for dirpath, dirnames, filenames in os.walk(dataset_path):

for filename in filenames:

if filename.endswith('.snirf'):
# Get the full path of the file
relative_path = os.path.relpath(dirpath, dataset_path)

# get each part of the path
parent_folders = relative_path.split(os.sep)

# including the filename
filename_without_ext = os.path.splitext(filename)[0]
parent_folders.append(filename_without_ext)

# add to the list of file paths
file_list.append(parent_folders)


#% CHECK EACH FILE TO GATHER INFO TO POPULATE THE MAPPING_DF


for path_parts in file_list:
try:

# need to check for sub
subject = check_for_bids_field(path_parts, 'sub')

# check for session
ses = check_for_bids_field(path_parts, 'ses')

# check for run

run = check_for_bids_field(path_parts, 'run')

# check for task
task = check_for_bids_field(path_parts, 'task')

# check for acq
acq = check_for_bids_field(path_parts, 'acq')

bids_dict = {"current_name": "/".join(path_parts),
"sub": subject,
"ses": ses,
"run": run,
"task": task,
"acq": acq
}
snirf2bids_mapping_df = pd.concat([snirf2bids_mapping_df, pd.DataFrame([bids_dict])], ignore_index=True)
except:
continue



snirf2bids_mapping_df.to_csv(os.path.join(dataset_path, 'snirf2BIDS_mapping.csv'), index=None)











110 changes: 109 additions & 1 deletion src/cedalion/io/bids.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,115 @@
from pathlib import Path

import os
import pandas as pd
import shutil

import cedalion
import cedalion.io
import json


def read_events_from_tsv(fname: str | Path):
return pd.read_csv(fname, delimiter="\t")


def check_for_bids_field(path_parts: list,
field: str):

field_parts = [part for part in path_parts if field in part]
if len(field_parts) == 0:
value = None
else:
find_value = field_parts[-1].split('_') # assume the lowest directory level supersedes any higher directory level ? not sure if we should assume this
value = [vals for vals in find_value if field in vals][0]
value = value.split('-')[1]

return value

def find_files_with_pattern(start_dir, pattern):
start_path = Path(start_dir)
return [str(file) for file in start_path.rglob(pattern)]

def create_bids_standard_filenames(row):
name_str = "sub-" + str(row["sub"])
parent_path = name_str
if not pd.isna(row["ses"]):
name_str += "_ses-" + str(row.ses)
parent_path = os.path.join(parent_path, "ses-" + str(row.ses))

name_str += "_task-" + str(row.task)

if not pd.isna(row["acq"]):
name_str += "_acq-" + str(row.acq)

if not pd.isna(row["run"]):
name_str += "_run-" + str(row.run)

name_str += "_nirs.snirf"
parent_path = os.path.join(parent_path, "nirs")

return name_str, parent_path


def copy_rename_snirf(row, dataset_path, bids_dir):

# Path of the source file (the file to be moved and renamed)
source_file = os.path.join(dataset_path, row["current_name"] + ".snirf")

recording = cedalion.io.read_snirf(source_file)[0]
if len(recording.stim) != 0:
destination_folder = os.path.join(bids_dir, row["parent_path"])
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
destination_file = os.path.join(destination_folder, row["bids_name"])
shutil.copy(source_file, destination_file)
return "copied"
else:
return "removed"


def check_coord_files(bids_dir):
results = find_files_with_pattern(bids_dir, "*_coordsystem.json")
for coord_file in results:
with open(coord_file, 'r') as file:
data = json.load(file)
if data["NIRSCoordinateSystem"] == "":
data["NIRSCoordinateSystem"] = "Other"
with open(coord_file, 'w') as json_file:
json.dump(data, json_file, indent=4)

def create_scan_files(group_df, bids_dir):
sub , ses = group_df.name
tsv_df = group_df[["bids_name", "acq_time"]]
tsv_df = tsv_df.rename(columns={'bids_name': 'filename'})
tsv_df["filename"] = "nirs/" + tsv_df["filename"]
if not pd.isna(ses):
filename = "sub-"+str(sub) + "_ses-"+str(ses) + "_scans.tsv"
path_to_save = os.path.join(bids_dir, "sub-"+str(sub), "ses-"+str(ses), filename)
else:
filename = "sub-"+str(sub) + "_scans.tsv"
path_to_save = os.path.join(bids_dir, "sub-"+str(sub), filename)
tsv_df.to_csv(path_to_save, sep='\t', index=False)


def create_data_description(dataset_path, bids_dir, extra_meta_data = None):
result = find_files_with_pattern(dataset_path, "dataset_description.json")
data_description_keys = ["Name", "DatasetType", "EthicsApprovals", "ReferencesAndLinks", "Funding"]
data_des = {}
if extra_meta_data is not None:
with open(extra_meta_data, 'r') as file:
data = json.load(file)
data = {key: value for key, value in data.items() if value != ''}
data_des.update({key: data[key] for key in data_description_keys if key in data})
if len(result) != 0:
with open(result[0], 'r') as file:
data_des.update(json.load(file))
data_des = {key: value for key, value in data_des.items() if value != ''}

if "Name" not in data_des:
name = os.path.basename(dataset_path)
data_des["Name"] = name
if "BIDSVersion" not in data_des:
data_des["BIDSVersion"] = '1.10.0'

with open(os.path.join(bids_dir, "dataset_description.json"), 'w') as json_file:
json.dump(data_des, json_file, indent=4)

0 comments on commit fa686e9

Please sign in to comment.