之前写了一篇用RxJava实现事件总线RxBus并实现同类型事件的区分用RxJava实现了事件总线RxBus,并实现了通过code进行相同事件类型的区分。但是发现使用起来还是不怎么方便,没有eventbus那样用起来方便。
我们来看一下之前是怎么用的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Subscription subscription =RxBus.getDefault() .toObservable(100,String.class) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.e("CM", s); } });
RxBus.getDefault().post(100, "123456");
@Override protected void onDestroy() { super.onDestroy(); if(!subscription .isUnsubscribed()) { subscription .unsubscribe(); } }
|
确实没有EventBus用起来方便。于是对之前的RxBus又进行了改造,参考了EventBus的使用方法。
添加了注解类@Subscribe
对方法进行注解:
1 2 3 4 5 6 7
| @Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface Subscribe { int code() default -1; ThreadMode threadMode() default ThreadMode.CURRENT_THREAD; }
|
code进行相同事件的区分,threadMode用来设置事件处理所在的线程.
ThreadMode代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public enum ThreadMode {
CURRENT_THREAD,
MAIN,
NEW_THREAD
}
|
另外添加了一个SubscriberMethod
类,用于保存@Subscribe
注解的方法的相关信息(参考EventBus的SubscriberMethod).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class SubscriberMethod { public Method method; public ThreadMode threadMode; public Class<?> eventType; public Object subscriber; public int code;
public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode) { this.method = method; this.threadMode = threadMode; this.eventType = eventType; this.subscriber = subscriber; this.code = code; }
public void invoke(Object o){ try { method.invoke(subscriber, o); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } }
}
|
接下来看看重点RxBus的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
| package com.cm.rxbus;
import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map;
import rx.Observable; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.functions.Action1; import rx.functions.Func1; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import rx.subjects.SerializedSubject; import rx.subjects.Subject;
public class RxBus { private static volatile RxBus defaultInstance;
private Map<Class,List<Subscription>> subscriptionsByEventType = new HashMap<>();
private Map<Object,List<Class>> eventTypesBySubscriber = new HashMap<>();
private Map<Class,List<SubscriberMethod>> subscriberMethodByEventType = new HashMap<>();
private final Subject bus; public RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); } public static RxBus getDefault() { RxBus rxBus = defaultInstance; if (defaultInstance == null) { synchronized (RxBus.class) { rxBus = defaultInstance; if (defaultInstance == null) { rxBus = new RxBus(); defaultInstance = rxBus; } } } return rxBus; }
public void post (Object o) { bus.onNext(o); }
public <T> Observable<T> toObservable(Class<T> eventType) { return bus.ofType(eventType); }
public void post(int code, Object o){ bus.onNext(new Message(code,o));
}
public <T> Observable<T> toObservable(final int code, final Class<T> eventType) { return bus.ofType(Message.class) .filter(new Func1<Message,Boolean>() { @Override public Boolean call(Message o) { return o.getCode() == code && eventType.isInstance(o.getObject()); } }).map(new Func1<Message,Object>() { @Override public Object call(Message o) { return o.getObject(); } }).cast(eventType); }
public void register(Object subscriber){ Class<?> subClass = subscriber.getClass(); Method[] methods = subClass.getDeclaredMethods(); for(Method method : methods){ if(method.isAnnotationPresent(Subscribe.class)){ Class[] parameterType = method.getParameterTypes(); if(parameterType != null && parameterType.length == 1){
Class eventType = parameterType[0];
addEventTypeToMap(subscriber,eventType); Subscribe sub = method.getAnnotation(Subscribe.class); int code = sub.code(); ThreadMode threadMode = sub.threadMode();
SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber,method,eventType, code,threadMode); addSubscriberToMap(eventType, subscriberMethod);
addSubscriber(subscriberMethod); } } } }
private void addEventTypeToMap(Object subscriber, Class eventType){ List<Class> eventTypes = eventTypesBySubscriber.get(subscriber); if(eventTypes == null){ eventTypes = new ArrayList<>(); eventTypesBySubscriber.put(subscriber,eventTypes); }
if(!eventTypes.contains(eventType)){ eventTypes.add(eventType); } }
private void addSubscriberToMap(Class eventType, SubscriberMethod subscriberMethod){ List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType); if(subscriberMethods == null){ subscriberMethods = new ArrayList<>(); subscriberMethodByEventType.put(eventType,subscriberMethods); }
if(!subscriberMethods.contains(subscriberMethod)){ subscriberMethods.add(subscriberMethod); } }
private void addSubscriptionToMap(Class eventType, Subscription subscription){ List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if(subscriptions == null){ subscriptions = new ArrayList<>(); subscriptionsByEventType.put(eventType,subscriptions); }
if(!subscriptions.contains(subscription)){ subscriptions.add(subscription); } }
public void addSubscriber(final SubscriberMethod subscriberMethod){ Observable observable ; if(subscriberMethod.code == -1){ observable = toObservable(subscriberMethod.eventType); }else{ observable = toObservable(subscriberMethod.code,subscriberMethod.eventType); }
Subscription subscription = postToObservable(observable,subscriberMethod) .subscribe(new Action1<Object>() { @Override public void call(Object o) { callEvent(subscriberMethod.code,o); } }); addSubscriptionToMap(subscriberMethod.eventType ,subscription); }
private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod) {
switch (subscriberMethod.threadMode) { case MAIN: observable.observeOn(AndroidSchedulers.mainThread()); break;
case NEW_THREAD: observable.observeOn(Schedulers.newThread()); break; case CURRENT_THREAD: observable.observeOn(Schedulers.immediate()); break; default: throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode); } return observable; }
private void callEvent(int code, Object object){ Class eventClass = object.getClass(); List<SubscriberMethod> methods = subscriberMethodByEventType.get(eventClass); if(methods != null && methods.size() > 0){ for(SubscriberMethod subscriberMethod : methods){
Subscribe sub = subscriberMethod.method.getAnnotation(Subscribe.class); int c = sub.code(); if(c == code){ subscriberMethod.invoke(object); }
} } }
public void unRegister(Object subscriber){ List<Class> subscribedTypes = eventTypesBySubscriber.get(subscriber); if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unSubscribeByEventType(eventType); unSubscribeMethodByEventType(subscriber,eventType); } eventTypesBySubscriber.remove(subscriber); } }
private void unSubscribeByEventType(Class eventType){ List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { Iterator<Subscription> iterator = subscriptions.iterator(); while(iterator.hasNext()){ Subscription subscription = iterator.next(); if(subscription !=null && !subscription.isUnsubscribed()){ subscription.unsubscribe(); iterator.remove(); } } } }
private void unSubscribeMethodByEventType(Object subscriber, Class eventType){ List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType); if(subscriberMethods != null){ Iterator<SubscriberMethod> iterator = subscriberMethods.iterator(); while (iterator.hasNext()){ SubscriberMethod subscriberMethod = iterator.next(); if(subscriberMethod.subscriber.equals(subscriber)){ iterator.remove(); } } } }
}
|
register的时候获取@Subscribe
注解的方法的相关信息保存到map,post事件触发的时候调用@Subscribe
注解的方法并传入参数.
unRegister的移除保存的subscriber、subscriberMethod已经Subscription取消订阅事件。
再来看使用方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| package com.cm.simple;
import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import android.view.View;
import com.cm.rxbus.RxBus; import com.cm.rxbus.Subscribe; import com.cm.rxbus.ThreadMode;
import butterknife.ButterKnife; import butterknife.OnClick;
public class MainActivity extends AppCompatActivity {
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); ButterKnife.bind(this); RxBus.getDefault().register(this); }
@Subscribe public void onEvent(String s) { Log.e("CMAD", "------>" + s); }
@Subscribe public void onEvent(EventA eventA) { Log.e("CMAD", "---onEvent EventA-->" + eventA.text); }
@Subscribe(code = 102) public void onEventWithCode(EventA eventA) { Log.e("CMAD", "---onEvent EventA 102-->" + eventA.text); }
@Subscribe(code = 103, threadMode = ThreadMode.MAIN) public void onEventWithCodeAndThreadMode(EventA eventA) { Log.e("CMAD", "---onEvent EventB 103--->" + eventA.text); }
@Override protected void onDestroy() { super.onDestroy(); RxBus.getDefault().unRegister(this); }
@OnClick({R.id.btn1, R.id.btn2, R.id.btn3, R.id.btn4}) public void onClick(View view) { switch (view.getId()) { case R.id.btn1: RxBus.getDefault().post("123456"); break; case R.id.btn2: RxBus.getDefault().post(new EventA()); break; case R.id.btn3: RxBus.getDefault().post(102, new EventA("event code 102")); break; case R.id.btn4: RxBus.getDefault().post(103, new EventA("event code 103")); break; } }
class EventA { public String text = "BeanA";
public EventA() {
}
public EventA(String text) { this.text = text; }
@Override public String toString() { return "EventA{" + "text='" + text + ''' + '}'; } } }
|
使用方法是不是跟EventBus很像,同样使用@Subscribe
注解,threadMode设置在什么线程执行.并添加了code进行相同类型事件的区分.
整个工程可以移步GitHub查看:RxBus
项目已上传到jcenter
,在Gradle中可以直接使用compile 'com.cm:rxbus:1.0'