

import pyvisa
import time
import csv
from datetime import datetime
import os
import argparse
import sys
# ------------------------------------------------------------------------------
# Command-line argument parsing
# This section defines and parses command-line arguments, allowing users to
# customize the scan parameters (filename, frequency range, step size) when
# running the script.
# ------------------------------------------------------------------------------
parser = argparse.ArgumentParser(description="Spectrum Analyzer Sweep and CSV Export")
# Define an argument for the prefix of the output CSV filename
parser.add_argument('--SCANname', type=str, default="25kz scan ",
help='Prefix for the output CSV filename')
# Define an argument for the start frequency
parser.add_argument('--startFreq', type=float, default=400e6,
help='Start frequency in Hz')
# Define an argument for the end frequency
parser.add_argument('--endFreq', type=float, default=650e6,
help='End frequency in Hz')
# Define an argument for the step size
parser.add_argument('--stepSize', type=float, default=25000,
help='Step size in Hz')
# Add an argument to choose who is running the program (apk or zap)
parser.add_argument('--user', type=str, choices=['apk', 'zap'], default='zap',
help='Specify who is running the program: "apk" or "zap". Default is "zap".')
# Parse the arguments provided by the user
args = parser.parse_args()
# Assign parsed arguments to variables for easy access
file_prefix = args.SCANname
start_freq = args.startFreq
end_freq = args.endFreq
step = args.stepSize
user_running = args.user
# Define the waiting time in seconds
WAIT_TIME_SECONDS = 300 # 5 minutes
# ------------------------------------------------------------------------------
# Main program loop
# The entire scanning process will now run continuously with a delay.
# ------------------------------------------------------------------------------
while True:
# --------------------------------------------------------------------------
# VISA connection setup
# This section establishes communication with the spectrum analyzer using the
# PyVISA library, opens the specified instrument resource, and performs initial
# configuration commands.
# --------------------------------------------------------------------------
# Define the VISA address of the spectrum analyzer. This typically identifies
# the instrument on the bus (e.g., USB, LAN, GPIB).
# Define the VISA address of the spectrum analyzer. This typically identifies
# the instrument on the bus (e.g., USB, LAN, GPIB).
apk_visa_address = 'USB0::0x0957::0xFFEF::CN03480580::0::INSTR'
zap_visa_address = 'USB1::0x0957::0xFFEF::SG05300002::0::INSTR'
if user_running == 'apk':
visa_address = apk_visa_address
else: # default is 'zap'
visa_address = zap_visa_address
# Create a ResourceManager object, which is the entry point for PyVISA.
rm = pyvisa.ResourceManager()
try:
# Open the connection to the specified instrument resource.
inst = rm.open_resource(visa_address)
print(f"Connected to instrument at {visa_address}")
# Clear the instrument's status byte and error queue.
inst.write("*CLS")
# Reset the instrument to its default settings.
inst.write("*RST")
# Query the Operation Complete (OPC) bit to ensure the previous commands have
# finished executing before proceeding. This is important for synchronization.
inst.query("*OPC?")
inst.write(":POWer:GAIN ON")
print("Preamplifier turned ON.")
inst.write(":POWer:GAIN 1") # '1' is equivalent to 'ON'
print("Preamplifier turned ON for high sensitivity.")
# Configure the display: Set Y-axis scale to logarithmic (dBm).
inst.write(":DISP:WIND:TRAC:Y:SCAL LOG")
# Configure the display: Set the reference level for the Y-axis.
inst.write(":DISP:WIND:TRAC:Y:RLEV -30")
# Enable Marker 1. Markers are used to read values at specific frequencies.
inst.write(":CALC:MARK1 ON")
# Set Marker 1 mode to position, meaning it can be moved to a specific frequency.
inst.write(":CALC:MARK1:MODE POS")
# Activate Marker 1, making it ready for use.
inst.write(":CALC:MARK1:ACT")
# Set the instrument to single sweep mode.
# This ensures that after each :INIT:IMM command, the instrument performs one
# sweep and then holds the trace data until another sweep is initiated.
inst.write(":INITiate:CONTinuous OFF")
# Pause execution for 2 seconds to allow the instrument to settle after configuration.
time.sleep(2)
# --------------------------------------------------------------------------
# File & directory setup
# This section prepares the output directory and generates a unique filename
# for the CSV export based on the current timestamp and user-defined prefix.
# --------------------------------------------------------------------------
# Define the directory where scan results will be saved.
# It creates a subdirectory named "N9340 Scans" in the current working directory.
scan_dir = os.path.join(os.getcwd(), "N9340 Scans")
# Create the directory if it doesn't already exist. `exist_ok=True` prevents
# an error if the directory already exists.
os.makedirs(scan_dir, exist_ok=True)
# Generate a timestamp for the filename to ensure uniqueness.
timestamp = datetime.now().strftime("%Y%m%d_%H-%M-%S")
# Construct the full path for the output CSV file.
filename = os.path.join(scan_dir, f"{file_prefix}--{timestamp}.csv")
# --------------------------------------------------------------------------
# Sweep and write to CSV
# This is the core logic of the script, performing the frequency sweep in
# segments, reading data from the spectrum analyzer, and writing it to the CSV.
# --------------------------------------------------------------------------
# Define the width of each frequency segment for sweeping.
# Sweeping in segments helps manage memory and performance on some instruments.
segment_width = 10_000_000 # 10 MHz
# Convert step size to integer, as some instrument commands might expect integers.
step_int = int(step)
# Convert end frequency to integer, for consistent comparison in loops.
scan_limit = int(end_freq)
# Open the CSV file in write mode (`'w'`). `newline=''` prevents extra blank rows.
with open(filename, mode='w', newline='') as csvfile:
# Create a CSV writer object.
writer = csv.writer(csvfile)
# Initialize the start of the current frequency block.
current_block_start = int(start_freq)
# Loop through frequency blocks until the end frequency is reached.
while current_block_start < scan_limit:
# Calculate the end frequency for the current block.
current_block_stop = current_block_start + segment_width
# Ensure the block stop doesn't exceed the overall scan limit.
if current_block_stop > scan_limit:
current_block_stop = scan_limit
# Print the current sweep range to the console for user feedback.
print(f"Sweeping range {current_block_start / 1e6:.3f} to {current_block_stop / 1e6:.3f} MHz")
# Set the start frequency for the instrument's sweep.
inst.write(f":FREQ:START {current_block_start}")
# Set the stop frequency for the instrument's sweep.
inst.write(f":FREQ:STOP {current_block_stop}")
# Initiate a single immediate sweep.
inst.write(":INIT:IMM")
# Query Operation Complete to ensure the sweep has finished before reading markers.
# This replaces the fixed time.sleep(2) for more robust synchronization.
inst.query("*OPC?")
# Initialize the current frequency for data point collection within the block.
current_freq = current_block_start
# Loop through each frequency step within the current block.
while current_freq <= current_block_stop:
# Set Marker 1 to the current frequency.
inst.write(f":CALC:MARK1:X {current_freq}")
# Query the Y-axis value (level in dBm) at Marker 1's position.
# .strip() removes any leading/trailing whitespace or newline characters.
level_raw = inst.query(":CALC:MARK1:Y?").strip()
try:
# Attempt to convert the raw level string to a float.
level = float(level_raw)
# Format the level to one decimal place for consistent output.
level_formatted = f"{level:.1f}"
# Convert frequency from Hz to MHz for readability.
freq_mhz = current_freq / 1_000_000
# Print the frequency and level to the console.
print(f"{freq_mhz:.3f} MHz : {level_formatted} dBm")
# Write the frequency and formatted level to the CSV file.
writer.writerow([freq_mhz, level_formatted])
except ValueError:
# If the raw level cannot be converted to a float (e.g., if it's an error message),
# use the raw string directly.
level_formatted = level_raw
# Optionally, you might want to log this error or write a placeholder.
print(f"Warning: Could not parse level '{level_raw}' at {current_freq / 1e6:.3f} MHz")
writer.writerow([current_freq / 1_000_000, level_formatted])
# Increment the current frequency by the step size.
current_freq += step_int
# Move to the start of the next block.
current_block_start = current_block_stop
except pyvisa.VisaIOError as e:
print(f"VISA Error: Could not connect to or communicate with the instrument: {e}")
print("Please ensure the instrument is connected and the VISA address is correct.")
# Decide if you want to exit or retry after a connection error
# For now, it will proceed to the wait and then try again.
except Exception as e:
print(f"An unexpected error occurred during the scan: {e}")
# Continue to the wait or exit if the error is critical
finally:
# ----------------------------------------------------------------------
# Cleanup
# This section ensures that the instrument is returned to a safe state and
# the VISA connection is properly closed after the scan is complete.
# ----------------------------------------------------------------------
if 'inst' in locals() and inst.session != 0: # Check if inst object exists and is not closed
try:
# Attempt to send the instrument to local control.
inst.write("SYST:LOC")
except pyvisa.VisaIOError:
pass # Ignore if command is not supported or connection is already broken
finally:
inst.close()
print("Instrument connection closed.")
# Print a confirmation message indicating the scan completion and output file.
if 'filename' in locals(): # Only print if filename was successfully created
print(f"\nScan complete. Results saved to '{filename}'")
# --------------------------------------------------------------------------
# Countdown and Interruptible Wait
# --------------------------------------------------------------------------
print("\n" + "="*50)
print(f"Next scan in {WAIT_TIME_SECONDS // 60} minutes.")
print("Press Ctrl+C at any time during the countdown to interact.")
print("="*50)
seconds_remaining = WAIT_TIME_SECONDS
skip_wait = False
while seconds_remaining > 0:
minutes = seconds_remaining // 60
seconds = seconds_remaining % 60
# Print countdown, overwriting the same line
sys.stdout.write(f"\rTime until next scan: {minutes:02d}:{seconds:02d} ")
sys.stdout.flush() # Ensure the output is immediately written to the console
try:
time.sleep(1)
except KeyboardInterrupt:
sys.stdout.write("\n") # Move to a new line after Ctrl+C
sys.stdout.flush()
choice = input("Countdown interrupted. (S)kip wait, (Q)uit program, or (R)esume countdown? ").strip().lower()
if choice == 's':
skip_wait = True
print("Skipping remaining wait time. Starting next scan shortly...")
break # Exit the countdown loop
elif choice == 'q':
print("Exiting program.")
sys.exit(0) # Exit the entire script
else:
print("Resuming countdown...")
# Continue the loop from where it left off
seconds_remaining -= 1
if not skip_wait:
# Clear the last countdown line
sys.stdout.write("\r" + " "*50 + "\r")
sys.stdout.flush()
print("Starting next scan now!")
print("\n" + "="*50 + "\n") # Add some spacing for clarity between cycles