admin管理员组

文章数量:1314292

I already have a code using threadpool tkiniter and matplotlib to process signals which are getting written to a file from another process. The Synchronization between the two process is by reading and writing to the same file. Using the following function which is busy waiting until a newline is written to the file.

def readline_char_by_char(file,fig):
line = ''
while True:
    char = file.read(1)
    if not char:
        fig.canvas.flush_events()
        #time.sleep(0.01)
        continue
    line += char
    if char == '\n':
        return line

Then we are computing the values from a chunk of signal. The following is my core loop of the code.

while True:
        try:
            line = readline_char_by_char(file,fig)
            signalx.append(float(line.strip().split()[0]))
            signaly.append(float(line.strip().split()[1]))
            if len(signaly) == 1000:
                num_signal += 1
                pulse_label.config(text=f"Number of Pulses: {num_signal}")
                if num_signal % 1000 == 0:
                    print(f"Updating: {num_signal}")
                #update_first_plot(axs[0][0])
                    
                
                first_plot_line_removal(axs[0][0],lines)
                

                lines[0].set_data(signalx, signaly)
                filtered_signal_y = filter_fft(signaly)
                
                save_filtered_signal(outfile, filtered_signal_y)
                success, istart, fintercept, y_sub_bipolar, y_baseline_restored, y_shifted, y_attenuated = find_true_cfd(np.multiply(-1, signaly), d)
                lines[1].set_data(signalx, np.multiply(-1,y_shifted))
                lines[2].set_data(signalx, y_sub_bipolar)
                first_plot_vertical_line_drawing(axs[0][0], istart, l1, d, l3)

                if istart < 0 or not success:
                    signalx = []
                    signaly = []
                    continue
                i_delayed, i_total, y_filtered_baselineCorrected = charge_comparison_method(signalx, filtered_signal_y, istart, delayed_start=istart+l2, end=istart+l3)
                I_Delayed.append(i_delayed)
                I_Total.append(i_total)
                # if len(I_Delayed) % 1000 == 0:
                # #     print("Inside I_Delayed vs I_Total scatter")
                #     sc_return_scatter(I_Delayed, I_Total, axs[0][1])

                TOF.append(fintercept)
                true_cfd_out_file.write(f"{fintercept}\n")
                integration_file.write(f"{i_delayed} {i_total}\n")
                
                energy = slope * i_total + intercept
                psd = i_delayed / i_total
                energy_file.write(f"{energy}\n")
                psd_file.write(f"{psd}\n")
                energy_psd_file.write(f"{energy} {psd}\n")
                PSD.append(psd)
                Energy.append(energy)

                # if success and istart > 0:
                #     pulse_x.append(signalx.copy())
                #     pulse_y.append(np.roll(np.multiply(-1, y_baseline_restored.copy()), -istart))
                #     filtered_y_signals.append(np.roll(y_filtered_baselineCorrected.copy(), -istart))
                
                if len(PSD) % 1000 == 0:
                #     print("Inside PSD")
                    #app.after(0, update_plots, I_Delayed, I_Total, Energy, PSD, TOF, axs, fig)
                    update_plots(I_Delayed, I_Total, Energy, PSD, TOF, axs, fig)

I already have a code using threadpool tkiniter and matplotlib to process signals which are getting written to a file from another process. The Synchronization between the two process is by reading and writing to the same file. Using the following function which is busy waiting until a newline is written to the file.

def readline_char_by_char(file,fig):
line = ''
while True:
    char = file.read(1)
    if not char:
        fig.canvas.flush_events()
        #time.sleep(0.01)
        continue
    line += char
    if char == '\n':
        return line

Then we are computing the values from a chunk of signal. The following is my core loop of the code.

while True:
        try:
            line = readline_char_by_char(file,fig)
            signalx.append(float(line.strip().split()[0]))
            signaly.append(float(line.strip().split()[1]))
            if len(signaly) == 1000:
                num_signal += 1
                pulse_label.config(text=f"Number of Pulses: {num_signal}")
                if num_signal % 1000 == 0:
                    print(f"Updating: {num_signal}")
                #update_first_plot(axs[0][0])
                    
                
                first_plot_line_removal(axs[0][0],lines)
                

                lines[0].set_data(signalx, signaly)
                filtered_signal_y = filter_fft(signaly)
                
                save_filtered_signal(outfile, filtered_signal_y)
                success, istart, fintercept, y_sub_bipolar, y_baseline_restored, y_shifted, y_attenuated = find_true_cfd(np.multiply(-1, signaly), d)
                lines[1].set_data(signalx, np.multiply(-1,y_shifted))
                lines[2].set_data(signalx, y_sub_bipolar)
                first_plot_vertical_line_drawing(axs[0][0], istart, l1, d, l3)

                if istart < 0 or not success:
                    signalx = []
                    signaly = []
                    continue
                i_delayed, i_total, y_filtered_baselineCorrected = charge_comparison_method(signalx, filtered_signal_y, istart, delayed_start=istart+l2, end=istart+l3)
                I_Delayed.append(i_delayed)
                I_Total.append(i_total)
                # if len(I_Delayed) % 1000 == 0:
                # #     print("Inside I_Delayed vs I_Total scatter")
                #     sc_return_scatter(I_Delayed, I_Total, axs[0][1])

                TOF.append(fintercept)
                true_cfd_out_file.write(f"{fintercept}\n")
                integration_file.write(f"{i_delayed} {i_total}\n")
                
                energy = slope * i_total + intercept
                psd = i_delayed / i_total
                energy_file.write(f"{energy}\n")
                psd_file.write(f"{psd}\n")
                energy_psd_file.write(f"{energy} {psd}\n")
                PSD.append(psd)
                Energy.append(energy)

                # if success and istart > 0:
                #     pulse_x.append(signalx.copy())
                #     pulse_y.append(np.roll(np.multiply(-1, y_baseline_restored.copy()), -istart))
                #     filtered_y_signals.append(np.roll(y_filtered_baselineCorrected.copy(), -istart))
                
                if len(PSD) % 1000 == 0:
                #     print("Inside PSD")
                    #app.after(0, update_plots, I_Delayed, I_Total, Energy, PSD, TOF, axs, fig)
                    update_plots(I_Delayed, I_Total, Energy, PSD, TOF, axs, fig)
Share Improve this question edited Jan 30 at 14:23 bigreddot 34.6k5 gold badges73 silver badges127 bronze badges asked Jan 30 at 12:37 Ayan BanerjeeAyan Banerjee 1612 silver badges12 bronze badges 2
  • 1 It sounds like you have an interesting use case, but I'm not quite sure what the question is. – mdurant Commented Jan 30 at 14:18
  • I have implemented a file reading mechanism to read a text file line by line and after 1000 line my event loop starts two algorithms to compute two parameters and update a 2D scatter plot with those two values. I want to scale the file reading and Matplotlib updating using Dask. – Ayan Banerjee Commented Jan 30 at 19:08
Add a comment  | 

1 Answer 1

Reset to default -1

Your workflow is quite complex, so I have some thoughts for you rather than an answer.

  • I am not sure whether it's the data reading, the calculation, or the graph updating which are slow.

  • Reading bytes should never consume your CPU time, this is pure latency. Polling is fine (and streamz's own file reader does this), but there's also iofiles and similar, or builtin [loop methods]*https://docs.python./3/library/asyncio-eventloop.html#asyncio.loop.add_reader) which you can use.

  • pure-python code does not parallelise well, and I suspect you are working with very little data here, so function call overheads will dominate

  • streamz's integration with dask is via the futures interface of distributed, which adds considerable overhead, not least for moving data to and from the cluster. Before trying to integrate with your GUI, I would play with distributed alone, to see if you can achieve better performance.

本文标签: