packages = ["biopython"]

AB1 File Analyzer

ドラッグ&ドロップまたはクリックしてAB1ファイルを選択

処理中...
from js import document, console, Chart, Object, Uint8Array, Blob, URL, window from pyodide.ffi import create_proxy from pyodide.ffi.wrappers import add_event_listener from Bio import SeqIO from io import BytesIO import asyncio sequence_data = [] def reset_data(event=None): global sequence_data sequence_data = [] document.getElementById('result').innerHTML = '' # ファイル入力のリセット file_input = document.getElementById('fileInput') file_input.value = '' console.log("Data reset completed") def calc_alignment_score(seq1, seq2): """ 2つの配列間のアライメントスコアを計算 """ match_score = 1 mismatch_penalty = -1 gap_penalty = -0.5 # 文字列をリストに変換して処理 seq1_list = list(seq1) seq2_list = list(seq2) score = 0 for b1, b2 in zip(seq1_list, seq2_list): if b1 == '-' or b2 == '-': score += gap_penalty elif b1 == b2: score += match_score else: score += mismatch_penalty return score def visualize_alignment(sequences, labels=None): """アラインメントを視覚的に表示""" if labels is None: labels = [f"Seq{i+1}" for i in range(len(sequences))] # ラベルの最大長を取得(整列表示のため) max_label_len = max(len(label) for label in labels) # 各配列を表示 alignment_view = [] for label, seq in zip(labels, sequences): padding = " " * (max_label_len - len(label)) alignment_view.append(f"{label}{padding} : {seq}") return "\n".join(alignment_view) def reverse_complement(seq): """配列のリバース・コンプリメントを返す""" complement = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C', 'N': 'N', '-': '-'} return ''.join(complement.get(base, 'N') for base in reversed(seq)) def calc_alignment_score(seq1, seq2): # semi_global_alignと同一のスコアリング match_score = 1 mismatch_penalty = -1 gap_penalty = -2 # semi_global_alignと同様に-2を使用 # 以下は変更不要 seq1_list = list(seq1) seq2_list = list(seq2) # 長さ合わせ max_len = max(len(seq1_list), len(seq2_list)) seq1_list.extend(['-'] * (max_len - len(seq1_list))) seq2_list.extend(['-'] * (max_len - len(seq2_list))) score = 0 for b1, b2 in zip(seq1_list, seq2_list): if b1 == '-' or b2 == '-': score += gap_penalty elif b1 == b2: score += match_score else: score += mismatch_penalty return score def generate_consensus_from_alignment(aligned_seqs, quality_scores): """ 与えられたアラインメント(aligned_seqs)と対応するクオリティスコア(quality_scores)から、 コンセンサス配列と、そのコンセンサス配列位置ごとのクオリティスコアを返す。 aligned_seqs: ['ACGT-', 'A-GT-'] など、アラインされた配列群 quality_scores: [[q1_1, q1_2, ...], [q2_1, q2_2, ...]] 各配列のクオリティスコアリスト """ consensus = [] consensus_qual = [] length = len(aligned_seqs[0]) # 全て同じ長さのはず for i in range(length): bases = [] quals = [] for seq_idx, seq in enumerate(aligned_seqs): base = seq[i] if base != '-': # クオリティスコアインデックス計算(ギャップを除外した位置) qual_idx = i - seq[:i].count('-') if qual_idx < len(quality_scores[seq_idx]): bases.append(base) quals.append(quality_scores[seq_idx][qual_idx]) if not bases: # 全てギャップの場合 consensus.append('N') consensus_qual.append(0) else: # 同じ塩基ごとにクオリティ合計を計算 base_quality_sum = {} for b, q in zip(bases, quals): base_quality_sum[b] = base_quality_sum.get(b, 0) + q # 最もクオリティ合計が高い塩基を選択 best_base, best_qual = max(base_quality_sum.items(), key=lambda x: x[1]) consensus.append(best_base) consensus_qual.append(best_qual) return ''.join(consensus), consensus_qual def semi_global_align(seq1, seq2): match_score = 1 mismatch_score = -1 gap_penalty = -2 rows = len(seq1) + 1 cols = len(seq2) + 1 matrix = [[0 for _ in range(cols)] for _ in range(rows)] # スコアマトリックス計算 for i in range(1, rows): for j in range(1, cols): if seq1[i-1] == seq2[j-1]: diag = matrix[i-1][j-1] + match_score else: diag = matrix[i-1][j-1] + mismatch_score up = matrix[i-1][j] + gap_penalty left = matrix[i][j-1] + gap_penalty matrix[i][j] = max(diag, up, left) # 最終スコア探索(セミグローバル) max_score = float('-inf') max_pos = (rows-1, cols-1) # 最終行で最大スコア for j in range(cols): if matrix[rows-1][j] > max_score: max_score = matrix[rows-1][j] max_pos = (rows-1, j) # 最終列で最大スコア for i in range(rows): if matrix[i][cols-1] > max_score: max_score = matrix[i][cols-1] max_pos = (i, cols-1) i, j = max_pos i_end, j_end = i, j aligned1 = [] aligned2 = [] # トレースバック while i > 0 and j > 0: score_current = matrix[i][j] score_diag = matrix[i-1][j-1] score_up = matrix[i-1][j] score_left = matrix[i][j-1] if seq1[i-1] == seq2[j-1]: score_match = score_diag + match_score else: score_match = score_diag + mismatch_score if score_current == score_match: aligned1.append(seq1[i-1]) aligned2.append(seq2[j-1]) i -= 1 j -= 1 elif score_current == score_up + gap_penalty: aligned1.append(seq1[i-1]) aligned2.append('-') i -= 1 elif score_current == score_left + gap_penalty: aligned1.append('-') aligned2.append(seq2[j-1]) j -= 1 else: # スコア改善できない場合 break aligned1.reverse() aligned2.reverse() # トレースバック終了点が開始点に相当 i_start, j_start = i, j return ''.join(aligned1), ''.join(aligned2), max_score, i_start, i_end-1, j_start, j_end-1 def restore_full_alignment(seq1, seq2, aligned1, aligned2, i_start, i_end, j_start, j_end): """ semi_global_alignで整列された aligned1, aligned2 を、 元配列の全長をカバーするように前後に復元する。 i_start, i_end, j_start, j_endは、元配列seq1, seq2上で aligned1, aligned2が対応する領域の開始・終了インデックス(0-based)。 """ # i_end, j_end はsemi_global_align内で (i_end-1), (j_end-1) の形で返したので # i_end, j_endはaligned配列に対応する最後のインデックスを指すとする。 # seq1[i_start:i_end+1], seq2[j_start:j_end+1] がaligned部分に相当。 # 前後の未整列領域 prefix1 = seq1[:i_start] prefix2 = seq2[:j_start] suffix1 = seq1[i_end+1:] suffix2 = seq2[j_end+1:] # 前半の復元: # prefix部はアラインメント情報なしのため、単純に左右端をギャップで揃える。 # 長い方のprefixに合わせて短い方をギャップ埋め prefix_len_diff = len(prefix1) - len(prefix2) if prefix_len_diff > 0: # prefix1が長いのでprefix2側にギャップ追加 prefix_aligned1 = prefix1 prefix_aligned2 = '-' * prefix_len_diff + prefix2 else: # prefix2が長い、または同じ長さ prefix_aligned1 = '-' * (-prefix_len_diff) + prefix1 prefix_aligned2 = prefix2 # 後半の復元: suffix_len_diff = len(suffix1) - len(suffix2) if suffix_len_diff > 0: # suffix1が長いのでsuffix2側にギャップ追加 suffix_aligned1 = suffix1 suffix_aligned2 = suffix2 + '-' * suffix_len_diff else: # suffix2が長い、または同じ長さ suffix_aligned1 = suffix1 + '-' * (-suffix_len_diff) suffix_aligned2 = suffix2 # 全体の再構築 restored_aligned1 = prefix_aligned1 + aligned1 + suffix_aligned1 restored_aligned2 = prefix_aligned2 + aligned2 + suffix_aligned2 return restored_aligned1, restored_aligned2 def perform_assembly(sequence_data): try: sequences_with_qual = [ (data['sequence'], list(data['quality_scores']), data.get('file_name', f'Seq{i+1}')) for i, data in enumerate(sequence_data) ] if len(sequences_with_qual) < 2: console.log("Need at least two sequences for assembly") return None # 最初の配列をコンセンサスの出発点にする seq1, qual1, name1 = sequences_with_qual[0] # 2番目以降の配列を一つずつコンセンサスに取り込む for i in range(1, len(sequences_with_qual)): seq2, qual2, name2 = sequences_with_qual[i] # 正方向アライメント prealigned1, prealigned2, max_score_fwd, i_start, i_end, j_start, j_end = semi_global_align(seq1, seq2) aligned1_fwd, aligned2_fwd = restore_full_alignment(seq1, seq2, prealigned1, prealigned2, i_start, i_end, j_start, j_end) score_fwd = calc_alignment_score(aligned1_fwd, aligned2_fwd) # リバースコンプリメントアライメント seq2_rc = reverse_complement(seq2) qual2_rc = list(reversed(qual2)) prealigned1_rc, prealigned2_rc, max_score_rc, i_start_rc, i_end_rc, j_start_rc, j_end_rc = semi_global_align(seq1, seq2_rc) aligned1_rc, aligned2_rc = restore_full_alignment(seq1, seq2_rc, prealigned1_rc, prealigned2_rc, i_start_rc, i_end_rc, j_start_rc, j_end_rc) score_rc = calc_alignment_score(aligned1_rc, aligned2_rc) # スコアが高い方を採用 if score_fwd >= score_rc: aligned1, aligned2 = aligned1_fwd, aligned2_fwd used_qual2 = qual2 reversed_flag = False best_score = score_fwd else: aligned1, aligned2 = aligned1_rc, aligned2_rc used_qual2 = qual2_rc reversed_flag = True best_score = score_rc # 新しいコンセンサスを生成 consensus_seq, consensus_qual = generate_consensus_from_alignment( [aligned1, aligned2], [qual1, used_qual2] ) # 次の段階のためにコンセンサスを更新 seq1, qual1, name1 = consensus_seq, consensus_qual, f"Consensus_after_{name2}" # 全ての配列を統合したコンセンサスがseq1, qual1に格納される console.log("Final consensus sequence after hierarchical alignment:") console.log(seq1) return seq1 except Exception as e: console.log(f"Error in assembly: {str(e)}") console.log(f"Error details: {e.__dict__}") return None def export_fasta(sequences, assembled_sequence=None): """FASTAフォーマットでエクスポートする関数""" try: fasta_content = [] # 入力ファイル名から配列名を生成 file_names = [data.get('file_name', f'Sequence_{i+1}') for i, data in enumerate(sequence_data)] base_name = '_'.join([name.replace('.ab1', '') for name in file_names]) # 入力配列を追加 for file_name, seq in zip(file_names, sequences): header = file_name.replace('.ab1', '') fasta_content.append(f">{header}") fasta_content.append(seq) # アセンブル結果を追加 if assembled_sequence: assembled_header = f"Assembled_{base_name}" fasta_content.append(f">{assembled_header}") fasta_content.append(assembled_sequence) # FASTAファイルとしてダウンロード content = '\n'.join(fasta_content) # JavaScriptのBlobを使用してファイルを作成 blob = window.Blob.new([content], {'type': 'text/plain'}) url = window.URL.createObjectURL(blob) # ダウンロードリンクを作成 a = document.createElement('a') a.href = url a.download = f'{base_name}_assembly.fasta' a.click() window.URL.revokeObjectURL(url) except Exception as e: console.log(f"Error in export: {str(e)}") console.log(f"Error details: {e.__dict__}") # より詳細なエラー情報 # イベントハンドラの追加 def handle_assemble(event=None): try: if not sequence_data: window.alert("No sequences available for assembly") return # 配列をアセンブル(sequence_dataではなく、配列のリストを渡す) assembled_sequence = perform_assembly(sequence_data) # sequence_dataをそのまま渡す if assembled_sequence: # アセンブル結果を表示 result_div = document.getElementById('result') result_div.insertAdjacentHTML('beforeend', f"""

Assembled Sequence

Length: {len(assembled_sequence)}

Sequence:
{assembled_sequence}
""") # FASTAファイルとしてエクスポート sequences = [data['sequence'] for data in sequence_data] # 配列の抽出 export_fasta(sequences, assembled_sequence) else: window.alert("Assembly failed") except Exception as e: console.log(f"Error in handle_assemble: {str(e)}") console.log(f"Error details: {e.__dict__}") # 詳細なエラー情報 window.alert("Error during assembly and export") # HTML部分の更新 document.getElementById('dropZone').insertAdjacentHTML('afterend', """
""") def process_ab1_file(file_content, file_name): # file_name パラメータを追加 try: handle = BytesIO(file_content) record = SeqIO.read(handle, "abi") sequence = str(record.seq) # クオリティスコアの取得 try: quality_scores = record.letter_annotations.get("phred_quality", []) if not quality_scores: quality_scores = record.annotations.get('abif_raw', {}).get('PCON1', []) except Exception as e: console.log(f"Error getting quality scores: {str(e)}") quality_scores = [] # トレースデータを取得 channels = ['DATA9', 'DATA10', 'DATA11', 'DATA12'] traces = {} raw_data = record.annotations.get('abif_raw', {}) for i, channel in enumerate(['A', 'C', 'G', 'T']): trace_key = channels[i] if trace_key in raw_data: traces[channel] = raw_data[trace_key][:1000] return { 'file_name': file_name, # ファイル名を追加 'sequence': sequence, 'quality_scores': quality_scores, 'traces': traces } except Exception as e: console.log(f"Error in process_ab1_file: {str(e)}") return None def create_quality_chart(canvas_id, quality_scores): try: if not quality_scores: console.log(f"No quality scores available for {canvas_id}") return ctx = document.getElementById(canvas_id).getContext('2d') # PyProxyをネイティブJavaScriptの値に変換 js_quality_scores = list(map(int, quality_scores.to_py() if hasattr(quality_scores, 'to_py') else quality_scores)) js_labels = list(map(str, range(1, len(js_quality_scores) + 1))) js_config = { 'type': 'line', 'data': { 'labels': js_labels, 'datasets': [{ 'label': 'Quality Scores', 'data': js_quality_scores, 'borderColor': 'rgb(75, 192, 192)', 'tension': 0.1, 'pointRadius': 0 }] }, 'options': { 'responsive': True, 'scales': { 'y': { 'beginAtZero': True } } } } Chart.new(ctx, js_config) console.log("Quality chart created successfully") except Exception as e: console.log(f"Error creating quality chart: {str(e)}") console.log(f"Quality scores type: {type(quality_scores)}") def create_chromatogram(canvas_id, traces): try: ctx = document.getElementById(canvas_id).getContext('2d') datasets = [] colors = {'A': 'green', 'C': 'blue', 'G': 'black', 'T': 'red'} # PyProxyをネイティブJavaScriptの値に変換 js_traces = traces.to_py() if hasattr(traces, 'to_py') else traces for base, color in colors.items(): if base in js_traces: trace_data = list(map(int, js_traces[base][:1000])) datasets.append({ 'label': base, 'data': trace_data, 'borderColor': color, 'borderWidth': 1, 'pointRadius': 0 }) if datasets: first_dataset_length = len(datasets[0]['data']) js_labels = list(map(str, range(first_dataset_length))) js_config = { 'type': 'line', 'data': { 'labels': js_labels, 'datasets': datasets }, 'options': { 'responsive': True, 'animation': False, 'scales': { 'y': { 'beginAtZero': True } } } } Chart.new(ctx, js_config) console.log("Chromatogram created successfully") except Exception as e: console.log(f"Error creating chromatogram: {str(e)}") console.log(f"Traces type: {type(traces)}") async def read_file_as_bytes(file): try: array_buffer = await file.arrayBuffer() uint8_array = Uint8Array.new(array_buffer) return bytes(uint8_array) except Exception as e: console.log(f"Error reading file: {str(e)}") raise async def process_files(files): try: loading = document.getElementById('loading') result_div = document.getElementById('result') loading.style.display = 'block' result_div.innerHTML = '' for i in range(files.length): file = files.item(i) console.log(f"Processing file: {file.name}") file_content = await read_file_as_bytes(file) data = process_ab1_file(file_content, file.name) # ファイル名を渡す if data: sequence_data.append(data) container = document.createElement('div') container.className = 'sequence-container' quality_data = ','.join(map(str, data['quality_scores'])) trace_data = { base: ','.join(map(str, values[:1000])) for base, values in data['traces'].items() } container.innerHTML = f"""

{file.name}

Sequence length: {len(data['sequence'])}

Sequence:
{data['sequence']}

Chromatogram

Quality Scores

""" result_div.appendChild(container) # クロマトグラムのスクリプトを作成 chromatogram_script = document.createElement('script') chromatogram_script.text = f""" (function() {{ const ctx = document.getElementById('chromatogram_{i}').getContext('2d'); const traces = {{ 'A': [{trace_data.get('A', '')}], 'C': [{trace_data.get('C', '')}], 'G': [{trace_data.get('G', '')}], 'T': [{trace_data.get('T', '')}] }}; const datasets = []; const colors = {{ 'A': 'green', 'C': 'blue', 'G': 'black', 'T': 'red' }}; Object.entries(colors).forEach(([base, color]) => {{ if (traces[base] && traces[base].length > 0) {{ datasets.push({{ label: base, data: traces[base], borderColor: color, borderWidth: 1, pointRadius: 0 }}); }} }}); new Chart(ctx, {{ type: 'line', data: {{ labels: Array.from(Array(traces['A'].length).keys()), datasets: datasets }}, options: {{ responsive: true, maintainAspectRatio: false, // アスペクト比を固定しない animation: false, scales: {{ y: {{ beginAtZero: true, position: 'left', title: {{ display: true, text: 'Signal Intensity' }} }}, x: {{ title: {{ display: true, text: 'Base Position' }} }} }}, plugins: {{ legend: {{ position: 'top', labels: {{ boxWidth: 20, padding: 20 }} }} }} }} }}); }})(); """ document.getElementById(f'chromatogram_script_{i}').appendChild(chromatogram_script) # クオリティスコアのスクリプトを作成 quality_script = document.createElement('script') quality_script.text = f""" (function() {{ const ctx = document.getElementById('quality_{i}').getContext('2d'); const qualityScores = [{quality_data}]; new Chart(ctx, {{ type: 'line', data: {{ labels: Array.from(Array(qualityScores.length).keys()).map(x => (x + 1).toString()), datasets: [{{ label: 'Quality Scores', data: qualityScores, borderColor: 'rgb(75, 192, 192)', tension: 0.1, pointRadius: 0 }}] }}, options: {{ responsive: true, maintainAspectRatio: false, scales: {{ y: {{ beginAtZero: true, title: {{ display: true, text: 'Quality Score' }} }}, x: {{ title: {{ display: true, text: 'Base Position' }} }} }}, plugins: {{ legend: {{ position: 'top', labels: {{ boxWidth: 20, padding: 20 }} }} }} }} }}); }})(); """ document.getElementById(f'quality_script_{i}').appendChild(quality_script) else: error_div = document.createElement('div') error_div.style.color = 'red' error_div.textContent = f"Could not process file: {file.name}" result_div.appendChild(error_div) except Exception as e: console.log(f"Error in process_files: {str(e)}") error_div = document.createElement('div') error_div.style.color = 'red' error_div.textContent = f"Error processing files: {str(e)}" result_div.appendChild(error_div) finally: loading.style.display = 'none' async def handle_file_select(event): files = event.target.files await process_files(files) async def handle_drop(event): console.log("drop event triggered") try: dt = event.dataTransfer console.log("dataTransfer:", dt) files = dt.files console.log("files:", files) # filesがPyProxyの場合はlen(files)で長さが取れるか試す console.log("len(files):", len(files)) console.log("Types:", event.dataTransfer.types) console.log("Items:", event.dataTransfer.items) except Exception as e: console.log("Exception occurred:", str(e)) event.preventDefault() event.stopPropagation() files = event.dataTransfer.files files = event.dataTransfer.files await process_files(files) def handle_dragover(event): event.preventDefault() event.stopPropagation() event.dataTransfer.dropEffect = 'copy' event.target.classList.add('dragover') def handle_dragleave(event): event.preventDefault() event.target.classList.remove('dragover') # メイン関数 def main(): drop_zone = document.getElementById('dropZone') file_input = document.getElementById('fileInput') assemble_button = document.getElementById('assembleButton') reset_button = document.getElementById('resetButton') drop_zone.addEventListener('click', create_proxy(lambda e: file_input.click())) file_input.addEventListener('change', create_proxy(lambda e: asyncio.create_task(handle_file_select(e)))) drop_zone.addEventListener('drop', create_proxy(lambda e: asyncio.create_task(handle_drop(e)))) drop_zone.addEventListener('dragover', create_proxy(handle_dragover)) drop_zone.addEventListener('dragleave', create_proxy(handle_dragleave)) assemble_button.addEventListener('click', create_proxy(handle_assemble)) reset_button.addEventListener('click', create_proxy(lambda e: reset_data())) # スタイルの追加 document.head.insertAdjacentHTML('beforeend', """ """) # メイン関数を実行 main()