cleo.ioproc module¶
Basic processor definitions and control/estimation functions
- class cleo.ioproc.LatencyIOProcessor(sample_period: Quantity = 1. * msecond, sampling: str = 'fixed', processing: str = 'parallel')[source]¶
Bases:
IOProcessorIOProcessor capable of delivering stimulation some time after measurement.
Note
It doesn’t make much sense to combine parallel computation with “when idle” sampling, because “when idle” sampling only produces one sample at a time to process.
Method generated by attrs for class LatencyIOProcessor.
- get_ctrl_signals(t_query)[source]¶
Get per-stimulator control signal from the
IOProcessor.- Parameters:
t_query (Quantity) – Current simulation time.
- Returns:
A {‘stimulator_name’: ctrl_signal} dictionary for updating stimulators.
- Return type:
dict
- is_sampling_now(t_query)[source]¶
Determines whether the processor will take a sample at this timestep.
- Parameters:
t_now (Quantity) – Current time.
- Return type:
bool
- out_buffer: deque[Tuple[dict, float]]¶
“serial” computes the output time by adding the delay for a sample onto the output time of the previous sample, rather than the sampling time. Note this may be of limited utility because it essentially means the entire round trip cannot be in parallel at all. More realistic is that simply each block or phase of computation must be serial. If anyone cares enough about this, it will have to be implemented in the future.
Note
It doesn’t make much sense to combine parallel computation with “when idle” sampling, because “when idle” sampling only produces one sample at a time to process.
- Raises:
ValueError – For invalid sampling or processing kwargs
- abstract process(state_dict: dict, t_samp: Quantity) Tuple[dict, Quantity][source]¶
Process network state to generate output to update stimulators.
This is the function the user must implement to define the signal processing pipeline.
- Parameters:
state_dict (dict) – {recorder_name: state} dictionary from
get_state()t_samp (Quantity) – The time at which the sample was taken.
- Returns:
{‘stim_name’: ctrl_signal} dictionary and output time (including unit).
- Return type:
Tuple[dict, Quantity]
- processing: str¶
“serial” or “parallel”.
“parallel” computes the output time by adding the delay for a sample onto the sample time, so if the delay is 2 ms, for example, while the sample period is only 1 ms, some of the processing is happening in parallel. Output order matches input order even if the computed output time for a sample is sooner than that for a previous sample.
“serial” computes the output time by adding the delay for a sample onto the output time of the previous sample, rather than the sampling time. Note this may be of limited utility because it essentially means the entire round trip cannot be in parallel at all. More realistic is that simply each block or phase of computation must be serial. If anyone cares enough about this, it will have to be implemented in the future.
- Type:
Processing scheme
- put_state(state_dict: dict, t_samp: Quantity)[source]¶
Deliver network state to the
IOProcessor.- Parameters:
state_dict (dict) – A dictionary of recorder measurements, as returned by
get_state()t_samp (Quantity) – The current simulation timestep. Essential for simulating control latency and for time-varying control.
- sampling: str¶
“fixed” or “when idle”.
“fixed” sampling means samples are taken on a fixed schedule, with no exceptions.
“when idle” sampling means no samples are taken before the previous sample’s output has been delivered. A sample is taken ASAP after an over-period computation: otherwise remains on schedule.
- Type:
Sampling scheme
- t_samp: Quantity¶
Record of sampling times—each time
put_state()is called.
- class cleo.ioproc.RecordOnlyProcessor(sample_period, **kwargs)[source]¶
Bases:
LatencyIOProcessorTake samples without performing any control.
Use this if all you are doing is recording.
Method generated by attrs for class LatencyIOProcessor.
- out_buffer: deque[Tuple[dict, float]]¶
“serial” computes the output time by adding the delay for a sample onto the output time of the previous sample, rather than the sampling time. Note this may be of limited utility because it essentially means the entire round trip cannot be in parallel at all. More realistic is that simply each block or phase of computation must be serial. If anyone cares enough about this, it will have to be implemented in the future.
Note
It doesn’t make much sense to combine parallel computation with “when idle” sampling, because “when idle” sampling only produces one sample at a time to process.
- Raises:
ValueError – For invalid sampling or processing kwargs
- process(state_dict: dict, sample_time: float) Tuple[dict, float][source]¶
Process network state to generate output to update stimulators.
This is the function the user must implement to define the signal processing pipeline.
- Parameters:
state_dict (dict) – {recorder_name: state} dictionary from
get_state()t_samp (Quantity) – The time at which the sample was taken.
- Returns:
{‘stim_name’: ctrl_signal} dictionary and output time (including unit).
- Return type:
Tuple[dict, Quantity]
- processing: str¶
“serial” or “parallel”.
“parallel” computes the output time by adding the delay for a sample onto the sample time, so if the delay is 2 ms, for example, while the sample period is only 1 ms, some of the processing is happening in parallel. Output order matches input order even if the computed output time for a sample is sooner than that for a previous sample.
“serial” computes the output time by adding the delay for a sample onto the output time of the previous sample, rather than the sampling time. Note this may be of limited utility because it essentially means the entire round trip cannot be in parallel at all. More realistic is that simply each block or phase of computation must be serial. If anyone cares enough about this, it will have to be implemented in the future.
- Type:
Processing scheme
- sampling: str¶
“fixed” or “when idle”.
“fixed” sampling means samples are taken on a fixed schedule, with no exceptions.
“when idle” sampling means no samples are taken before the previous sample’s output has been delivered. A sample is taken ASAP after an over-period computation: otherwise remains on schedule.
- Type:
Sampling scheme
- cleo.ioproc.exp_firing_rate_estimate(spike_counts: UInt[ndarray, 'num_spike_sources'], dt: Quantity, prev_rate: Quantity, tau: Quantity) Quantity[source]¶
Estimate firing rate with a recursive exponential filter.
- Parameters:
spike_counts (np.ndarray) – n-length vector of spike counts
dt (Quantity) – Time since last measurement (with Brian temporal unit)
prev_rate (Quantity) – n-length vector of previously estimated firing rates
tau (Quantity) – Time constant of exponential filter (with Brian temporal unit)
- Returns:
n-length vector of estimated firing rates (with Brian units)
- Return type:
Quantity