# ########################################################################## #
# #
# HANGKÁRTYA ALAPÚ IMPEDANCIA ÉS ÁTVITEL MÉRŐ ALKALMAZÁS #
# www.hangdobozepites.hu - 2022-2024 #
# #
# ########################################################################## #
#
# P R O G R A M B E Á L L Í T Á S O K
#
#---------------------------Mérési mód választás------------------------------
#MODE = 'Z' # Impedanciamednet mérése
#MODE = 'H' # Ch1/Ch2 relatív frekvencia-átvitel mérés
#MODE = 'C' # Kalibrálás nyitott kapcsokkal
#MODE = 'S' # Kalibrálás zárt kapcsokkal
#MODE = 'R' # Kalibrálás referencia ellenállással
MODE = None # Kiválasztás futásidőben
#
#---------------------------Kalibráció használata-----------------------------
#USING_CALIBRATE = True # kalibráció használata
USING_CALIBRATE = None # Kiválasztás futásidőben
#
#-----------------------------Sweep beállítások-------------------------------
FREQ_START = 5 #sweep kezdő frekvencia [Hz]
FREQ_END = 20000 #sweep vég frekvencia [Hz]
SWEEP_TIME = 0 #sweep löket ideje [sec] (csak ha FR_RES_PER_OCT=None)
FR_RES_PER_OCT = 24 #frekvencia tartomány felbontása 1/oktáv-ban
#FR_RES_PER_OCT = None #None esetén a SWEEP_TIME lesz használva
#
#--------------------------Sweep beállítások - haladó-------------------------
FSTART_TIME = 1.0 #előrezgetési idő [sec]
FEND_TIME = 0.2 #utórezgetési idő [sec]
FADING = True #előrezegtetés/utórezegtetés fel/le keveréssel
#
#-----------------------------Hardware beállítások----------------------------
SAMPLE_RATE = 48*1000 #hangkártya mintavételi frekvencia
BLOCKSIZE = 2*1024 #stream blokk méret
LR_DELAY = 0 # Vonal bemeneten a jobb csatorna késése a balhoz képest
#
#-------------------------------------JIG-------------------------------------
OUTPUT_LEVEL = 1 #relatív szoftveres jelszint (1=fullscale)
OUTPUT_LEVEL_LEFT = 1 #kimeneti jelszint a bal csatornan
OUTPUT_LEVEL_RIGHT =-1 #kimeneti jelszint a jobb csatornan
INPUT_CH_MEAS = 0 # Meres bementi csatornaszam
INPUT_CH_REF = 1 # Referencia bementi csatornaszam
R_SENSE = 100 #soros mérőellenállás [Ohm]
R_INPUT = 13.0E3 #behringer UCA222 line input [Ohm]
#CS_INPUT = 0E-6 #bemeneti csatoló kondenzátor [F]
#R_INPUT = 6.8E3 # Behringer UMC404HD insert input
#R_INPUT = 20.0E3 # Rubix22 input
R_INPUT = None #kikapcsolva (3 pontos kalibráció esetén már nem kell!)
CS_INPUT = None #kikapcsolva (3 pontos kalibráció esetén már nem kell!)
R_CAL = 1000 #külső kalibrló (referencia) ellenállás értéke (Ohm)
#
#----------------------------Szinkronizáció beállítása------------------------
MARGIN_TIME = 1.0 #üres jel bevezetőjelnek [sec] (ebbe kerül a szinkronjel is)
SYNC_POS_TIME = 0.25 #szinkron jel kezdete [sec]
#
#----------------------------Integráló szűrő beállítása-----------------------
#INT_FILTER_CUTOFF = FREQ_START # cutoff [Hz]
INT_FILTER_CUTOFF = FREQ_START * (2**0.5) # cutoff [Hz]
#INT_FILTER_CUTOFF = FREQ_START * 2 # cutoff [Hz]
#
#--------------------------ZMA/FRD (csv) file paraméterek---------------------
CSV_NUMBER_OF_ROWS = 1000 #200000 # maximum adatsorok a mentésben
CSV_COLUMN_SEPARATOR = '\t' #'; ' # elválasztó karakter
CSV_DECIMAL_FORMAT = '{:.8f}' # decimális formátum
#
#-------------------------------Grafikon (Matplotlib)-------------------------
USING_TIME_GRAPHS = True #Kevés RAM és hosszabb (30mp+) méréseknél célszerű False-ra állítani
GRAPH_MAX_RES = 2000 #Fekvenciafüggvények felbontása csak grafikonozáskor
#(matplotlib szereti elterminálni a programot, ha besokal)
#----------------------------------Debug kapcsolók----------------------------
DISABLE_STREAMING = False #sd.Playrec() metódus használata sd.Stream() helyett
#(Nem lesz folyamatjelző ablak!)
DEBUG_STREAMING = False #hangkártya stream helyett korábbi record.wav-ot dolgozza fel
#
#-----------------------------Mérőkapcsolás segítség---------------------------
#
# Bekötési példa Passzív kábelre 100 Ohm ellenállással
# (alacsony impedanciás ~50 Ohm fejhallgató kimenetről kell hajtani)
#
# ____________
# | <--
# L o<──────────────┐
# Line In R o<───────┐ │
# Zin>10k G o │ │
# | │ │
# | --> │Rsense│
# L o<───────┴[100Ω]┴──────────┐+/
# Phones R o 0.1% [ ]) Speaker
# 50Ohms G o<─────────────────────────┘-\
# ____________|
# Soundcard
#
#
# Max frekvenciák (fs >= f_max * 2,13):
# ----------------------------------------------------------------------------
# Sample_rate Freq_max
# ----------------------------------------------------------------------------
# 192000 Hz 90000 Hz
# 176400 Hz 82500 Hz
# 96000 Hz 45000 Hz
# 88200 Hz 41000 Hz
# 48000 Hz 22500 Hz
# 44100 Hz 20500 Hz
# ----------------------------------------------------------------------------
#
# Sweep idők 1/24 oktávos integráló szűrő esetére:
# ----------------------------------------------------------------------------
# f [Hz] sweep RAM igény
# löket (@Mintavételi fr.)
# [sec]
# ----------------------------------------------------------------------------
# 1-100 113s 42MiB @48kHz
# 5-100 15s 6MiB @48kHz
# 10-100 6s 3MiB @48kHz
# 20-100 2s 1MiB @48kHz
#
# 1-20k 243s 89MiB @48kHz
# 5-20k 41s 15MiB @48kHz
# 10-20k 19s 7MiB @48Khz
# 20-20k 9s 4MiB @48kHz
#
# 1-40k 260s 193MiB @96kHz
# 5-40k 45s 35MiB @96kHz
# 10-40k 21s 18MiB @96kHz
# 20-40k 10s 10MiB @96kHz
#
# 1-90k 280s 411MiB @192kHz
# 5-90k 48s 71MiB @192kHz
# 10-90k 23s 34MiB @192kHz
# 20-90k 11s 17MiB @192kHz
# ----------------------------------------------------------------------------
# A simítás növelése/csökkentése arányosan
# hat a löketidőre és a stream RAM igényre
# ----------------------------------------------------------------------------
#
# Windows ASIO támogatás beállítása:
#
# Add hozzá a portaudio bin könyvtárad a windows %PATH% környezeti változóhoz!
# (c:\Users\***\AppData\Local\Programs\Python\Python###\Lib\site-packages\_sounddevice_data\portaudio-binaries\)
#
#
##############################################################################
import numpy as np
from scipy.io.wavfile import write as write_wav, read as read_wav
from scipy.signal import square, correlate, convolve, firwin, windows
import matplotlib.pyplot as plt
import matplotlib.ticker as ptck
from itertools import cycle
import os
import time as time1
import math
from si_prefix import si_format as si
print('-------------------------------------------------------')
print(' Hangkártya alapú méricske alkalmazás ')
print(' 2022-2024 www.hangdobozepites.hu ')
print('-------------------------------------------------------')
print()
if LR_DELAY is None:
LR_DELAY = 0
modes = {
'Z' : 'Impedancia mérés',
'H' : 'Frekvencia-átvitel mérés',
'C' : 'Kalibrálás nyitott kapcsokkal (1. fázis)',
'S' : 'Kalibrálás zárt kapcsokkal (2. fázis)',
'R' : 'Kalibrálás referencia ellenállással (3. fázis)',
}
while not (type(MODE) is str and MODE in modes):
MODE = input('Mérési mód kiválasztása (Z/h/c/s/r/?) : ').strip().upper()
if MODE=='':
MODE='Z'
if MODE=='?':
print(' Módok:')
for _m in modes:
print (f' {_m} (vagy {_m.lower()}) - {modes[_m]}')
print(f'Választott mérési mód: {modes[MODE]}')
if FREQ_END > int(SAMPLE_RATE/1065)*500:
print(f'FREQ_END={FREQ_END}Hz túl nagy, ha {SAMPLE_RATE}Hz a mintavételezési frekvencia!')
FREQ_END = int(SAMPLE_RATE/1065)*500
print(f'Újaraszámolt érték:{FREQ_END}Hz')
if False and MODE in ['C','R']:
OUTPUT_LEVEL /= 2
if os.name=='nt':
os.environ['SD_ENABLE_ASIO']='1'
class ProgressBarPopupGui:
def __init__(self, maxtime):
import threading
self._time = 0
self._maxtime = maxtime
self._terminate = False
self._meters = [np.float64(0)]*2
self._msg = ''
threading.Thread(target=self._start, daemon=True).start()
def _start(self):
import threading
import tkinter as tk
from tkinter import ttk
self.root = tk.Tk()
self.root.protocol('WM_DELETE_WINDOW', self.on_closing)
self.root.attributes('-topmost', True)
self.root.resizable(0,0)
self.root.title('Mérés folyamata')
self.root.grid()
ttk.Label(self.root,text='Bal csúcsjel').grid(column=0, row=1, pady=[10,1], padx=5,sticky='w')
self.pb_meter1 = ttk.Progressbar(self.root, orient='horizontal', length=480)
self.pb_meter1.grid(column=1, row=1, columnspan=2,padx=[0,5], pady=[10,1])
ttk.Label(self.root,text='Jobb csúcsjel').grid(column=0, row=2, pady=[0,20], padx=5,sticky='w')
self.pb_meter2 = ttk.Progressbar(self.root, orient='horizontal', length=480)
self.pb_meter2.grid(column=1, columnspan=2,row=2, padx=[0,5], pady=[0,20])
self.lb_elapsed_time = ttk.Label(self.root,text='')
self.lb_elapsed_time.grid(column=0, row=3, padx=5, pady=[0,0], sticky='w')
self.lb_remain_time = ttk.Label(text='')
self.lb_remain_time.grid(column=2, row=3, padx=5, pady=[0,0], sticky='e')
self.pb = ttk.Progressbar(self.root, orient='horizontal', length=590)
self.pb.grid(column=0,columnspan=3, row=4, padx=5, pady=[0,0])
self.lb_status = ttk.Label(self.root,text='')
self.lb_status.grid(column=0,columnspan=3, row=5, padx=5, pady=[0,0], sticky='w')
threading.Thread(target=self._progress, daemon=True).start()
self.root.mainloop()
def _progress(self):
import time as time1
_se0 = ''
_sr0 = ''
_t0 = time1.time()
_lb0 = ''
while True:
time1.sleep(0.02)
_t = min(self._maxtime, self._time)
self.pb['value'] = 100*_t/self._maxtime
_te = min(self._maxtime, _t)
_te_min=int(_te // 60)
_te_sec=int(_te % 60)
_tr = max(0,self._maxtime-_t)
_tr_min=int(_tr // 60)
_tr_sec=int(_tr % 60)
_se = f'Eltelt idő:{_te_min:3d}:{_te_sec:02d}'
_sr = f'Hártalévő idő:{_tr_min:3d}:{_tr_sec:02d}'
self.pb_meter1['value'] = self._meters[0]*100
self.pb_meter2['value'] = self._meters[1]*100
if _se != _se0:
_se0 = _se
self.lb_elapsed_time['text']=_se
if _sr != _sr0:
_sr0 = _sr
self.lb_remain_time['text']=_sr
if self._msg != _lb0:
_lb0 = self._msg
#self.lb_status['text']='Haladás... ' + self._msg
self.lb_status['text'] = self._msg
if self._terminate:
break
self.root.destroy()
def terminate(self):
self._terminate = True
def set(self, t, msg, met):
self._time = t
self._msg = msg
self._meters[:]=met[:]
def on_closing(self):
self._meters[:]=[0,0]
class Streaming:
def __init__(self, sound_data):
import threading
self.data = sound_data
self.volume_ch0 = OUTPUT_LEVEL * OUTPUT_LEVEL_LEFT
self.volume_ch1 = OUTPUT_LEVEL * OUTPUT_LEVEL_RIGHT
self.volume = np.transpose(np.array([self.volume_ch0,self.volume_ch1]))
self.samplerate = SAMPLE_RATE
self.blocksize = BLOCKSIZE
self.frames = None
self.is_active = None
self.elapsed_time = 0
self.thr = threading.Thread(target=self._start, daemon=True)
self.thr.start()
self.error = False
self.errorstr = ''
self.device_name = None
def _start(self):
import sounddevice as sd
#Audio eszköz választás (kézi)
print('Audio eszközök listázása:')
device_last = None
dev_in_nr = -1
dev_out_nr = -1
if os.path.exists('dev.txt'):
with open('dev.txt','r') as _f:
device_last=_f.read().split('\r')[0].split('\n')[0]
if device_last != '':
dev_in_nr = device_last.split(',')[0]
dev_out_nr = device_last.split(',')[-1]
for _host in sd.query_hostapis():
if len(_host['devices'])>0:
print ("\n"+_host['name'])
for _devnr in _host['devices']:
_dev = sd.query_devices()[_devnr]
_inout = ''
if _dev['max_input_channels']>=2 and _dev['max_output_channels']>=2:
if float(dev_in_nr)==float(_devnr) and float(dev_out_nr)==float(_devnr):
_inout = '<=> Dup'
elif float(dev_in_nr)==float(_devnr):
_inout = ' -> Dup'
elif float(dev_out_nr)==float(_devnr):
_inout = '<- Dup'
else:
_inout = ' Dup'
elif _dev['max_input_channels']>=2 and _dev['max_output_channels']==0:
if float(dev_in_nr)==float(_devnr):
_inout = ' -> In '
else:
_inout = ' In '
elif _dev['max_input_channels']==0 and _dev['max_output_channels']>=2:
if float(dev_out_nr)==float(_devnr):
_inout = '<- Out'
else:
_inout = ' Out'
if _inout != '':
print(f" {_devnr:2d} {_inout:7s} {_dev['name']:s}, {_dev['max_input_channels']} in, {_dev['max_output_channels']} out")
if device_last is not None:
print (f'\nElőző választás {device_last} - (üres bevitel = marad!)\n')
while True:
device_str = input('Eszköz választása [duplex] vagy [in,out] : ').split('\r')[0].split('\n')[0].replace(' ','')
if device_str=='' and device_last is not None:
device_str=device_last
device = np.fromstring(device_str, dtype=int, sep=',').tolist()
if len(device)==0:
device=None
self.device_name = 'Alapértelmezett eszköz'
print(' -> Rendszer alapértelmezett eszköz használata')
break
elif len(device)==1:
device=device[0]
_dd=sd.query_devices()[device]
self.device_name = _dd['name']+' '+sd.query_hostapis(_dd['hostapi'])['name']
print(' -> Duplex eszköz: {:s}, ({:d} bemenet, {:d} kimenet), {:s}'.format(_dd['name'],_dd['max_input_channels'],_dd['max_output_channels'],['Nincs elég csatorna!','--OK--'][_dd['max_input_channels']>=2 and _dd['max_output_channels']>=2]))
if _dd['max_input_channels']>=2 and _dd['max_output_channels']>=2: break
elif len(device)==2:
_db=sd.query_devices()[device[0]]
_dk=sd.query_devices()[device[1]]
self.device_name = _db['name']+' '+sd.query_hostapis(_db['hostapi'])['name']
print(' -> Bemeneti eszköz: {:s}, ({:d} bemenet), {:s}'.format(_db['name'],_db['max_input_channels'],['Nincs elég csatorna!','--OK--'][_db['max_input_channels']>=2]))
print(' -> Kimeneti eszköz: {:s}, ({:d} kimenet), {:s}'.format(_dk['name'],_dk['max_output_channels'],['Nincs elég csatorna!','--OK--'][_dk['max_output_channels']>=2]))
if _db['max_input_channels']>=2 and _dk['max_output_channels']>=2: break
print('Hibás a választás!')
if device_str != '':
with open('dev.txt','w') as _f:
_f.write(device_str)
if MODE=='C':
print('---------------------------------------------')
print('- FIGYELEM! -')
print('- A mérőkapcsok ne legyenek csatlakoztatva! -')
print('---------------------------------------------')
input("Ha kész, nyomj Enter-t!")
elif MODE=='S':
print('---------------------------------------------')
print('- FIGYELEM! -')
print('- A mérőkapcsokat zárd össze! -')
print('---------------------------------------------')
input("Ha kész, nyomj Enter-t!")
elif MODE=='R':
print('---------------------------------------------')
print('- FIGYELEM! -')
print('- Kösd be a referencia ellenállást! -')
print('---------------------------------------------')
print(f'Referencia ellenállás értéke:{si(R_CAL)}Ohm')
input('Ha kész, nyomj Enter-t!')
print('Lejátszás/felvétel indul... ')
if DISABLE_STREAMING:
self.data[:] = sd.playrec(self.data, device=device,samplerate=self.samplerate,channels=2,dtype='float32',blocksize=self.blocksize,blocking=True)
else:
self.pb = ProgressBarPopupGui(total_length/fs)
self.is_active = True
self.frames = 0
self.elapsed_time = np.float64(0)
#Playback & record start
try:
with sd.Stream(device=device,samplerate=self.samplerate,channels=2,dtype='float32',blocksize=self.blocksize,callback=self._callback):
while self.is_active:
time1.sleep(0.1)
except Exception as e:
self.error = True
self.errorstr = str(e)
finally:
self.pb.terminate()
sd._terminate()
print(' ...kész')
def _callback(self, indata, outdata, frames, time_, status):
if self.frames + frames <= len(self.data):
outdata[:] = self.data[self.frames:self.frames+frames] * self.volume
self.data[self.frames:self.frames+frames] = indata[:]
self.frames += frames
else:
if len(self.data)%self.blocksize>0:
self.data[self.frames:self.frames+(len(self.data)%self.blocksize)] = indata
self.is_active = False
self.elapsed_time += frames/self.samplerate
if self.frames < sweep_position:
sweep_msg = 'Margó / szinkron'
elif self.frames >= sweep_position+sweep_length:
sweep_msg = 'Margó'
else:
sweep_msg = 'Sweep frekvencia: ' + si(f[self.frames-sweep_position],2)+'Hz'
if self.frames < sweep_position+sample_start:
sweep_msg += ' - előrezegtetés'
elif self.frames > sweep_position+sweep_length-sample_end:
sweep_msg += ' - utórezegtetés'
self.pb.set(self.elapsed_time, sweep_msg, [max(np.absolute(indata[:,0])), max(np.absolute(indata[:,1]))])
def WaitForComplette(self):
self.thr.join()
self._checkError()
return self
def _checkError(self):
if self.error:
raise Exception('\n\nStream hiba történt!\n'+self.errorstr+'\nA program leállt.')
#Sweep jel generálás
fs = SAMPLE_RATE
f0 = FREQ_START
f1 = FREQ_END
t1 = SWEEP_TIME if FR_RES_PER_OCT is None else math.ceil(FR_RES_PER_OCT * np.log2(f1/f0) / INT_FILTER_CUTOFF)
print('Stream adatok:')
print(f' Mintavételi frekvencia: {si(fs)}Hz')
print(f' Pufferméret: {BLOCKSIZE} Samples')
print(f' Pufferidő: {si(BLOCKSIZE/fs,2)}sec')
print(f' Kimeneti jelszint: {OUTPUT_LEVEL:.2f} = {20*np.log10(OUTPUT_LEVEL):.2f} dBFS')
print('Sweep jel adatok:')
print(f' Sweep tartomány: {si(f0)}Hz ... {si(f1)}Hz')
print(f' Sweep löket: {t1} sec')
sample_start = int(FSTART_TIME*fs)
sample_sweep = int(fs * t1)
sample_end=int(FEND_TIME*fs)
sweep_length = sample_start + sample_sweep + sample_end
margin_left = int(MARGIN_TIME * fs)
sync_position = int(SYNC_POS_TIME * fs)
sweep_position = margin_left
margin_right = margin_left - sync_position
total_length = math.ceil((margin_left + sweep_length + margin_right)/BLOCKSIZE)*BLOCKSIZE
smooting = INT_FILTER_CUTOFF * t1 / np.log2(f1/f0)
print('Mérőjel adatok:')
print(f' Mérőjel teljes ideje: {total_length/fs:.2f} sec')
print(f' Mérőjel RAM igénye: {total_length*8/1024**2:.0f} MB')
print(f' Integráló szűrő töréspontja: {INT_FILTER_CUTOFF:.2f} Hz')
print(f' Integráló szűrő simítása: {smooting:.2f} [1/oktáv]')
cal_open_filename = f'cal/calibrate_FS={fs}_F0={f0:d}_F1={f1:d}_t1={t1:d}_v4.1.npy'
cal_short_filename = f'cal/calibrate_short_FS={fs:d}_F0={f0:d}_F1={f1:d}_t1={t1}_rs={R_SENSE:f}_v4.1.npy'
cal_shorth_filename = f'cal/calibrate_shorth_FS={fs:d}_F0={f0:d}_F1={f1:d}_t1={t1}_rs={R_SENSE:f}_v4.1.npy'
cal_ref_filename = f'cal/calibrate_ref_FS={fs:d}_F0={f0:d}_F1={f1:d}_t1={t1:d}_rs={R_SENSE:f}_v4.1.npy'
if not os.path.isdir('cal'):
os.makedirs('cal')
if MODE=='S':
USING_CALIBRATE = True
if not os.path.isfile(cal_open_filename):
raise Exception('A zárt kapcsú kalibrációhoz először a nyitott kapcsú kalibráció elvégzése szükséges!')
if MODE=='R':
USING_CALIBRATE = True
if not os.path.isfile(cal_open_filename) or not os.path.isfile(cal_shorth_filename):
raise Exception('Referencia kalibrációhoz először a nyitott és zárt kapcsú kalibráció elvégzése szükséges!')
if USING_CALIBRATE is None and MODE!='C' and os.path.isfile(cal_open_filename):
USING_CALIBRATE = input('Kalibráció használata? (I/n) : ').strip().lower() in ['i', 'igen','', 'y', 'yes']
if USING_CALIBRATE:
print('Kalibráció bekapcsolva')
print("Mérőjel generálása... ")
t_sweep = np.linspace(0, t1, sample_sweep)
f_sweep = np.logspace(np.log10(f0), np.log10(f1), len(t_sweep))
f = np.concatenate((np.ones(sample_start)*f0, f_sweep, np.ones(sample_end)*f1))
phase = np.cumsum(2*np.pi*f/fs)
base_complex = np.exp(phase*-1j)*1j
phase = None
print(' ...kész')
#Szinkronjel generálás
sync_freq = 1000
sync_periods = 10
sync_length = int(fs*sync_periods/sync_freq)
sync_signal = np.float32(square(2.0*np.pi*sync_freq*np.arange(sync_length)/fs))
#Playback jel összeállítása
sound_data = np.float32(np.zeros((total_length,2)))
sound_data[sync_position:sync_position+sync_length] = np.transpose([sync_signal]*2)
sound_data[sweep_position:sweep_position+sweep_length] = np.transpose([base_complex.real.astype(np.float32)]*2)
#Fel/le keverés (tranziens csillapítás)
if FADING and sample_start >= 2:
sound_data[sweep_position:sweep_position+sample_start] *= np.transpose([windows.hann(sample_start*2)[0:sample_start]]*2)
if FADING and sample_end >= 2:
sound_data[sweep_position+sweep_length-sample_end:sweep_position+sweep_length] *= np.transpose([windows.hann(sample_end*2)[sample_end:]]*2)
if False and input('Mentsünk a lejátszandó audiót WAV fileba? (i/N) : ').strip().lower() in ['i', 'igen', 'y', 'yes']:
print('Wav file írása...')
write_wav('playback.wav', fs, sound_data)
print(' ...kész')
if not DEBUG_STREAMING:
device_name = Streaming(sound_data).WaitForComplette().device_name
else:
_, sound_data[:] = read_wav('record.wav') #DEBUG - korabbi mentett wav-ot dolgozza fel meres helyett
device_name = 'Debug - record.wav'
#Jelfeldolgozás
print('Szinkronjel keresése... ')
corr = np.abs(correlate(sound_data[:sweep_position, INPUT_CH_REF], sync_signal, mode='valid'))
maxpos = np.argmax(corr)
maxval = int(corr[maxpos]+0.5)
sample_offset = sync_position - maxpos - LR_DELAY
corr = None
print(' ...kész')
print(f'Szinkronjel match {maxval} limit >30')
if maxval < 30:
raise Exception('Túl kicsi a jel, nem található a szinkron!')
else:
print('Szinkronjel OK')
print(f'Audio streem késése: {sample_offset:d} sample')
print(f' {1000*sample_offset/fs:.2f} msec')
#Szinusz korreláció
corr_complex_meas = sound_data[sweep_position-sample_offset:sweep_position-sample_offset+sweep_length, INPUT_CH_MEAS] * base_complex
corr_complex_ref = sound_data[sweep_position-sample_offset+LR_DELAY:sweep_position-sample_offset+sweep_length+LR_DELAY, INPUT_CH_REF] * base_complex
base_complex = None
# ~ write_wav('corr.wav', fs, np.array([corr_complex_meas.real, corr_complex_meas.imag]).transpose())
#Integráló szűrő (FIR LP)
cutoff = INT_FILTER_CUTOFF
win_size = int(fs / cutoff * 10 + 0.5)
print(f'Integráló szűrő kernel hossz: {win_size:d} minta')
print(f' {1000*win_size/fs:.2f} msec')
print('Szűrés alkalmazása...')
time_stamp = time1.time()
win = firwin(win_size, cutoff/fs) * windows.hann(win_size)
corr_complex_meas = convolve(corr_complex_meas, win, mode='same')
corr_complex_ref = convolve(corr_complex_ref, win, mode='same')
print(' ...kész')
print(f'Szűrés ideje: {time1.time()-time_stamp:.3f} sec')
#Átviteli függvény számítása
H = corr_complex_meas[sample_start:sample_start+sample_sweep] / corr_complex_ref[sample_start:sample_start+sample_sweep]
corr_complex_meas = None
corr_complex_ref = None
cal_msg = ''
#open kalibráció betöltése és alkalmazása
if MODE != 'C' and USING_CALIBRATE:
if os.path.isfile(cal_open_filename):
print('Nyitott kapcsú kalibráció betöltése')
H /= np.load(cal_open_filename)
cal_msg += 'O'
else:
print('Nincs nyitott kapcsú kalibráció erre a beállításra!')
#Sort kalibráció betöltése
z_short = 0
if MODE in ['Z','R'] and USING_CALIBRATE:
if os.path.isfile(cal_short_filename):
print('Zárt kapcsú kalibráció betöltése')
z_short = np.load(cal_short_filename)
cal_msg += 'S'
else:
print('Nincs zárt kapcsú kalibráció erre a beállításra!')
#z_sense számítás vagy referencia kalibralas betoltese
z_sense = R_SENSE
if MODE=='Z' and USING_CALIBRATE and os.path.exists(cal_ref_filename):
print('Referencia kalibráció betöltése')
z_sense = np.load(cal_ref_filename)
cal_msg += 'R'
elif not R_INPUT is None and MODE in ['S','Z']:
if not CS_INPUT is None:
print('Z sense újraszámítása R input és Cs imput alapján')
z_sense = 1/(1/((-1j/(2*np.pi*f_sweep*CS_INPUT))+R_INPUT)+1/R_SENSE)
else:
print('Z sense újraszámítása R input alapján')
z_sense = 1/(1/R_SENSE+1/R_INPUT)
#Referencia ellenállással kalibrálás - z_short újraszámolása
if MODE=='R':
Hs = np.load(cal_shorth_filename)
z_short = R_CAL*Hs*(1-H)/(H-Hs)
Hs = None
np.save(cal_short_filename, z_short)
print('Zárt kapcsú kalibráció frissitése...')
#Impedancia függvény számítása
Z = (z_sense / (1/H - 1))-z_short if MODE in ['Z','S'] else (R_CAL + z_short) * (1/H - 1) if MODE=='R' else None
z_short = None
if cal_msg == '':
cal_msg = 'nincs'
#open kalibráció mentése
if MODE=='C':
print('Kalibrációs file írása...')
np.save(cal_open_filename, H)
print(' ...kész')
if os.path.exists(cal_shorth_filename):
print('Korábbi short átvitel file törlése')
os.remove(cal_shorth_filename)
if os.path.exists(cal_ref_filename):
print('Korábbi referencia kalibrálás file törlése')
os.remove(cal_ref_filename)
#sort kalibráció mentése
if MODE=='S':
print('Zárt kapcsú kalibráció mentése...')
np.save(cal_short_filename, Z)
np.save(cal_shorth_filename, H)
print(' ...kész')
if MODE=='R':
print('Referencia kalibráció mentése...')
np.save(cal_ref_filename, Z)
print(' ...kész')
#Grafikonozás
print ('Grafikonok megjelenítés')
figure, axis = plt.subplots(3 if USING_TIME_GRAPHS else 1, 1)
figure.suptitle(f"{device_name} {si(fs).replace(' ','')}Hz Kal={cal_msg} Lök={t1}s Sim={smooting:.2f} [1/okt]", size=9, ha='center', va='bottom', x=0.5, y=0)
if not isinstance(axis, np.ndarray):
axis=np.array([axis])
axis = cycle(axis.flat)
# idő graf.
if USING_TIME_GRAPHS:
ax = next(axis)
ax.plot(t_sweep, sound_data[sweep_position+sample_start-sample_offset:sweep_position+sample_start-sample_offset+len(t_sweep), 0])
ax.grid()
ax.grid(which='minor', axis='x', linestyle=':')
ax.set_ylabel('Bal bemenet')
print(' - bal bemenet időgrafikon megjelenítése')
ax = next(axis)
ax.plot(t_sweep, sound_data[sweep_position+sample_start-sample_offset:sweep_position+sample_start-sample_offset+len(t_sweep), 1])
ax.grid()
ax.grid(which='minor', axis='x', linestyle=':')
ax.set_ylabel('Jobb bemenet')
print(' - jobb bemenet időgrafikon megjelenítése')
# frekv. graf.
nunber_of_datas = GRAPH_MAX_RES
resample = np.int32(np.linspace(0,len(f_sweep)-1,min(nunber_of_datas, len(f_sweep)))+0.5)
ax = next(axis)
if MODE in ['Z','S','R']:
#ax.semilogx(f_sweep[resample], np.absolute(Z[resample]))
ax.loglog(f_sweep[resample], np.absolute(Z[resample]))
else:
ax.semilogx(f_sweep[resample], 20*np.log10(np.absolute(H[resample])))
ax.grid()
ax.grid(which='minor', axis='x', linestyle=':')
ax.grid(which='minor', axis='y', linestyle=':')
if MODE in ['Z','S','R']:
ax.set_ylabel('Impedancia (Ohm)')
print(' - impedancia frekvencia-grafikon megjelenítése')
else:
ax.set_ylabel('Átvitel (dB)')
print(' - átvitel frekvencia-grafikon megjelenítése')
ax = ax.twinx()
ax.semilogx(f_sweep[resample], np.rad2deg(np.angle((Z if MODE in ['Z','S','R'] else H)[resample])), color='orange', linestyle='dashed')
ax.set_ylim(-95,95)
ax.set_yticks([-90,-45,0,45,90])
ax.grid()
ax.grid(which='major', axis='y', linestyle='dashed')
ax.set_ylabel('fázis (fok)')
plt.show()
#Nyquist görbe Z méréskor
if True and MODE in ['Z','H']:# and input('Nyquist görbe rajzolása? (i/N) : ').strip().lower() in ['i', 'igen', 'y', 'yes']:
if MODE=='Z':
plt.axes().set_aspect('equal')#, 'datalim')
plt.plot(np.real(Z[resample]), np.imag(Z[resample]))
plt.grid()
plt.title('Impedancia helygörbe')
plt.ylabel('Im (Ohm)')
plt.xlabel('Re (Ohm)')
print(' - impedancia nyquest diagram megjelenítése')
plt.show()
#Nyquist görbe átvitel méréskor
if MODE=='H':
plt.axes().set_aspect('equal')#, 'datalim')
plt.plot(np.real(H[resample]), np.imag(H[resample]))
plt.grid()
plt.title('Átvitel helygörbe')
plt.ylabel('Im')
plt.xlabel('Re')
print(' - átvitel nyquest diagram megjelenítése')
plt.show()
#Kapacitás/induktivitás számítása
if True and MODE=='Z':# and input('Kapacitás és induktivitás számítása? (i/N) : ').strip().lower() in ['i', 'igen', 'y', 'yes']:
figure, axis = plt.subplots(2,3)
axis = cycle(axis.flat)
ax = next(axis)
X=Z[resample].imag
R=Z[resample].real
for i in range(2):
modestr='s' if i==0 else 'p'
ax.semilogx(f_sweep[resample], 1E3*X/(2*np.pi*f_sweep[resample]))
ax.grid()
ax.grid(which='minor', axis='x', linestyle=':')
ax.grid(which='minor', axis='y', linestyle=':')
ax.set_ylim(0,None)
ax.set_title(f'L{modestr}[mH]')
ax = next(axis)
ax.semilogx(f_sweep[resample], 1E6/(-X*2*np.pi*f_sweep[resample]))
ax.grid()
ax.grid(which='minor', axis='x', linestyle=':')
ax.grid(which='minor', axis='y', linestyle=':')
ax.set_ylim(0,None)
ax.set_title(f'C{modestr}[uF]')
ax = next(axis)
ax.semilogx(f_sweep[resample], R)
ax.grid()
ax.grid(which='minor', axis='x', linestyle=':')
ax.grid(which='minor', axis='y', linestyle=':')
ax.set_ylim(0,None)
ax.set_title(f'R{modestr}[Ohm]')
ax = next(axis)
X=-1/((1/Z[resample]).imag)
R=1/((1/Z[resample]).real)
print(' - L/C/R számított grafikonok megjelenítése')
plt.show()
#csv mentés - előkészítés
resample = np.int32(np.linspace(0,len(f_sweep)-1,min(CSV_NUMBER_OF_ROWS, len(f_sweep)-1), endpoint=True)+0.5)
f_resampled = f_sweep[resample]
#mentés ZMA fileba
if True and MODE in ['Z','S','R']:# and input('Mentsün az impedancimenetet ZMA fileba? (i/N) : ').strip().lower() in ['i', 'igen', 'y', 'yes']:
print ('ZMA file írása...')
Z_resampled = Z[resample]
exported_datas = np.transpose(np.array([f_resampled,np.absolute(Z_resampled),np.angle(Z_resampled,deg=True)]))
with open('impedancia.zma','w') as fzma:
fzma.write('Hz'+CSV_COLUMN_SEPARATOR+'Ohm'+CSV_COLUMN_SEPARATOR+'Deg\n')
fzma.write('\n'.join([CSV_COLUMN_SEPARATOR.join([CSV_DECIMAL_FORMAT.format(cell) for cell in row]) for row in exported_datas]))
print(' ...kész')
#mentés FRD fileba
if True and MODE in ['H','C']:# and input('Mentsün az átvitelt FRD fileba? (i/N) : ').strip().lower() in ['i', 'igen', 'y', 'yes']:
print ('FRD file írása...')
H_resampled = H[resample]
exported_datas = np.transpose(np.array([f_resampled,20*np.log10(np.absolute(H_resampled)),np.angle(H_resampled,deg=True)]))
with open('atvitel_'+MODE+'.frd','w') as fzma:
fzma.write('Hz'+CSV_COLUMN_SEPARATOR+'dB'+CSV_COLUMN_SEPARATOR+'Deg\n')
fzma.write('\n'.join([CSV_COLUMN_SEPARATOR.join([CSV_DECIMAL_FORMAT.format(cell) for cell in row]) for row in exported_datas]))
print(' ...kész')
#stream mentése
if True:# and input('Mentsük a felvett audiót WAV fileba? (i/N) : ').strip().lower() in ['i', 'igen', 'y', 'yes']:
print('Wav file írása...')
write_wav('record.wav', fs, sound_data)
print(' ...kész')
print('--Program vége--')