public class
DeviceSensorManager
{
2
3
private
Observable
<
SensorEvent
>
accelerometerEventObservable
;
4
5
public
DeviceSensorManager
(
Context context
) {
6
accelerometerEventObservable
=
7
Observable
.
create
(
emitter
-> {
8
SensorEventListener sensorEventListener
=
9
new
SensorEventListener
() {
10
11
@Override
public
void
onSensorChanged
(
SensorEvent e
) {
12
if
(
e
.
sensor
.
getType
() ==
Sensor
.
TYPE_ACCELEROMETER
) {
13
emitter
.
onNext
(
e
);
14
}
15
}
16
17
@Override
public
void
onAccuracyChanged
(
18
Sensor sensor
,
19
int
accuracy
20
) { }
21
};
22
23
SensorManager sensorManager
=
24
(
SensorManager
)
context
.
getSystemService
(
25
Context
.
SENSOR_SERVICE
26
);
27
sensorManager
.
registerListener
(
28
sensorEventListener
,
29
sensorManager
.
getDefaultSensor
(
30
Sensor
.
TYPE_ACCELEROMETER
31
),
32
SensorManager
.
SENSOR_DELAY_NORMAL
33
);
34
35
// Clean up when there are no longer any subscribers
36
emitter
.
setCancellable
(() -> {
37
sensorManager
.
unregisterListener
(
Chapter 6: Backpressure
87
38
sensorEventListener
39
);
40
});
41
}
42
}).
share
();
43
}
44
45
public
Observable
<
SensorEvent
>
accelerometerEventObservable
() {
46
return
accelerometerEventObservable
;
47
}
48
}
When creating a
DeviceSensorManager
, we pass it a
Context
so that we can obtain a handle
to a
SensorManager
. Given the
SensorManager
, we then set a
SensorEventListener
that listens
for accelerometer events (i.e. events of type
Sensor.TYPE_ACCELEROMETER
) at a rate defined by
SensorManager.SENSOR_DELAY_NORMAL
. This listener is then wrapped in an
Observable
and all
accelerometer events are emitted down the
Observable
stream. Lastly, the
Observable
is multicasted
using
.share()
so that multiple
SensorEventListener
s are not registered on each subscription.
The listener is also unregistered from the
SensorManager
when there are no longer any subscribers
(i.e. the lambda inside
emitter.setCancellable(...)
will be invoked when all observers of the
Observable
are unsubscribed/disposed).
The rate at which accelerometer events are emitted is defined by
SensorManager.SENSOR_DELAY_-
NORMAL
(i.e.
∼
20 milliseconds). But say for our needs we wanted to consume events at a much slower
rate; how might we do that? Surely, we can attach a separate listener with the desired rate but what
if we want to use the same
Observable
? There are a couple of options to achieve this, one of which
is to use the filtering operator
.sample()
.
Sample & Throttle
.sample(long, TimeUnit)
works by periodically sampling events at a specified rate by emitting the
latest item in a period while dropping all other items.
Chapter 6: Backpressure
88
Marble diagram of .sample()
.throttleFirst()
and
.throttleLast()
on the other hand can be used to only emit the first or last
item, respectively, over a sequential time period.
Marble diagram of .throttleFirst()
Chapter 6: Backpressure
89
Marble diagram of .throttleLast()
Buffering
Another strategy to help mitigate problems with chatty producers is to buffer or collect emissions
and emit the result downstream at a less frequent rate. Two useful operators for achieving this are
.buffer()
and
.window()
.
Buffer & Window
As the name implies,
.buffer(...)
allows you to buffer emissions from upstream into a list.
.buffer()
is overloaded to support buffering given different criteria, a few handy ones are:
•
Observable.buffer(int count)
: buffers items
count
items
•
Observable.buffer(ObservableSource boundary)
: buffers items in between emis-
sions from
boundary
•
Observable.buffer(long timespan, TimeUnit timeUnit)
: buffers items that are emitted in a
given timespan
Chapter 6: Backpressure
90
Marble diagram of .buffer()
A similar operator to
.buffer()
,
.window()
, allows you to emit
Observable
s instead of lists.
.window()
is also overloaded and allows buffers given different criteria.
Do'stlaringiz bilan baham: |