0%

用RxJava实现事件总线RxBus并实现同类型事件的区分

之前事件总线都是用EventBus或者otto来做,现在RxJava越来越火了,用RxJava实现事件总线也是很方便的.不多说直接看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* RxBus
*/
public class RxBus {
private static volatile RxBus defaultInstance;
// 主题
private final Subject bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 单例RxBus
public static RxBus getDefault() {
RxBus rxBus = defaultInstance;
if (defaultInstance == null) {
synchronized (RxBus.class) {
rxBus = defaultInstance;
if (defaultInstance == null) {
rxBus = new RxBus();
defaultInstance = rxBus;
}
}
}
return rxBus;
}
// 提供了一个新的事件
public void post (Object o) {
bus.onNext(o);
}
// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public <T> Observable<T> toObserverable (Class<T> eventType) {
return bus.ofType(eventType);
}
}

使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)
.subscribe(new Action1<UserEvent>() {
@Override
public void call(UserEvent userEvent) {
long id = userEvent.getId();
String name = userEvent.getName();
...
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// TODO: 处理异常
}
});

上面RxBus实现代码引用自:

用RxJava实现事件总线(Event Bus)](http://www.jianshu.com/p/ca090f6e2fe2/) 更详细信息可以点击查看

发现上面RxJava实现的RxBus确实很方便,代码也挺简洁.但是有个问题是我在使用EventBus以及otto的时候一直存在的疑问,就是当发送相同类型的事件或者消息的时候接收的时候怎么去区分?

基于这个问题我对上面的RxBus进行了改造.代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.cm.rxbus;

import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
* RxBus
* Created by CM on 2016-4-22 19:30:48.
*/
public class RxBus {
private static volatile RxBus defaultInstance;
// 主题
private final Subject bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 单例RxBus
public static RxBus getDefault() {
RxBus rxBus = defaultInstance;
if (defaultInstance == null) {
synchronized (RxBus.class) {
rxBus = defaultInstance;
if (defaultInstance == null) {
rxBus = new RxBus();
defaultInstance = rxBus;
}
}
}
return rxBus;
}

/**
* 提供了一个新的事件,单一类型
* @param o 事件数据
*/
public void post (Object o) {
bus.onNext(o);
}

/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
* @param eventType 事件类型
* @param <T>
* @return
*/
public <T> Observable<T> toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}

/**
* 提供了一个新的事件,根据code进行分发
* @param code 事件code
* @param o
*/
public void post(int code, Object o){
bus.onNext(new Message(code,o));

}

/**
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
* @param code 事件code
* @param eventType 事件类型
* @param <T>
* @return
*/
public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
return bus.ofType(Message.class)
.filter(new Func1<Message,Boolean>() {
@Override
public Boolean call(Message o) {
//过滤code和eventType都相同的事件
return o.getCode() == code &amp;&amp; eventType.isInstance(o.getObject());
}
}).map(new Func1<Message,Object>() {
@Override
public Object call(Message o) {
return o.getObject();
}
}).cast(eventType);
}
}

Message代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.cm.rxbus;

/**
* Created by CM on 2016/4/22.
*/
public class Message {
private int code;
private Object object;

public Message(<span />) {}

public Message(int code, Object o) {
this.code = code;
this.object = o;
}
//getter and setter
}

RxBus里添加了一个post(int code, Object o)方法,里面调用了bus.onNext(new Message(code,o));将code跟object用Message类进行了封装.

toObservable(final int code, final Class<T> eventType)对传入code的事件进行分发,先调用bus.ofType(Message.class)返回Message类的观察者,然后通过filter操作符返回Message里code跟Object类型跟传入的类型都匹配的观察者,再通过map操作符返回Message里的object对象,最后通过cast转化为特定类的观察者.

使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
//注册观察者
Subscription subscription =RxBus.getDefault()
.toObservable(100,String.class)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("CM", s);
}
});

//发送事件或消息:
RxBus.getDefault().post(100, "123456");

这样如果有多个消息是相同类型的话就可以通过不同的code进行区分了.

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

1
2
3
4
5
6
7
@Override
protected void onDestroy() {
super.onDestroy();
if(!subscription .isUnsubscribed()) {
subscription .unsubscribe();
}
}
 wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!