7

Buduję aplikację na Androida, która ma określone wymagania dotyczące niskiego zużycia energii Bluetooth.RxAndroidBle utrzymywanie stałego połączenia + obsługa zapisu/powiadamiania

Muszę napisać do charakterystyki tylko do zapisu i otrzymywać odpowiedzi na osobnej charakterystyce powiadomienia i muszę to zrobić w wielu, wielu działaniach. Czy istnieje sposób Rx, aby wysłać żądanie na pierwszą cechę, czekać na odpowiedź na drugiej, a następnie przejść do innego wniosku?

Ponadto, aby udostępnić moją instancję RxAndroidBle, pomyślałem o zrobieniu jakiegoś BleManager Singleton, w którym odsłoniłbym Observables, więc mogę z łatwością zapisać się do nich w moim Presenterze. Po prostu chcę uniknąć konieczności kopiowania logiki połączenia dla każdego działania i (najlepiej) stałego połączenia. W ten sposób mogłem tylko odsłonić connectionObservable i zasubskrybować go, dzięki czemu mogę łatwo wysyłać żądania zapisu i otrzymywać powiadomienia, ale jestem pewien, że jest lepszy sposób na zrobienie tego.

To co mam teraz:

@Singleton 
public class BleManager { 

    private PublishSubject<Void> disconnectTriggerSubject = PublishSubject.create(); 
    private Observable<RxBleConnection> connectionObservable; 
    private boolean isConnected; 

    private final UUID CTRL_FROM_BRIDGE_UUID = UUID.fromString("someUUID"); 
    private final UUID BLE_WRITE_CHARACTERISTIC_UUID = UUID.fromString("someOtherUUID"); 

    private final RxBleClient bleClient; 
    private String mMacAddress; 
    private final Context context; 
    private RxBleDevice bleDevice; 

    @Inject 
    public BleManager(Context context, RxBleClient client) { 
    Timber.d("Constructing BleManager and injecting members"); 
    this.context = context; 
    this.bleClient = client; 
    } 

    public void setMacAddress(String mMacAddress) { 
    this.mMacAddress = mMacAddress; 

    // Set the associated device on MacAddress change 
    bleDevice = bleClient.getBleDevice(this.mMacAddress); 
    } 

    public String getMacAddress() { 
    return mMacAddress; 
    } 

    public RxBleDevice getBleDevice() { 
    Preconditions.checkNotNull(mMacAddress); 
    return bleClient.getBleDevice(mMacAddress); 
    } 

    public Observable<RxBleScanResult> getScanSubscription() { 
    Preconditions.checkNotNull(context); 
    Preconditions.checkNotNull(bleClient); 

    return bleClient.scanBleDevices().distinct(); 
    } 

    public Observable<RxBleConnection> getConnectionSubscription() { 
    Preconditions.checkNotNull(context); 
    Preconditions.checkNotNull(bleDevice); 

    if (connectionObservable == null) { 
     connectionObservable = bleDevice.establishConnection(context, false) 
             .takeUntil(disconnectTriggerSubject) 
             .observeOn(AndroidSchedulers.mainThread()) 
             .doOnUnsubscribe(this::clearSubscription) 
             .compose(new ConnectionSharingAdapter()); 
    } 

    return connectionObservable; 
    } 

    public Observable<byte[]> setupListeners() { 
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CTRL_FROM_BRIDGE_UUID)) 
           .doOnNext(notificationObservable -> Timber.d("Notification Setup")) 
           .flatMap(notificationObservable -> notificationObservable) 
           .observeOn(AndroidSchedulers.mainThread()); 
    } 

    private void triggerDisconnect() { 
    disconnectTriggerSubject.onNext(null); 
    } 


    public Observable<byte[]> writeBytes(byte[] bytes) { 
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(
     BLE_WRITE_CHARACTERISTIC_UUID, 
     bytes)).observeOn(AndroidSchedulers.mainThread()); 
    } 

    private boolean isConnected() { 
    return bleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTED; 
    } 

    /** 
    * Will update the UI with the current state of the Ble Connection 
    */ 
    private void registerConnectionStateChange() { 
    bleDevice.observeConnectionStateChanges().observeOn(AndroidSchedulers.mainThread()).subscribe(connectionState -> { 
     isConnected = connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED); 
    }); 
    } 

    private void clearSubscription() { 
    connectionObservable = null; 
    } 

} 

Odpowiedz

4

myślałem trochę o korzystaniu z osobna. Dzieląc to samo połączenie, wprowadzasz stany do aplikacji, które wymagają nieco obsługi stanu, a zatem niemożliwe jest (lub przynajmniej nie mam pojęcia jak) być czysto reaktywne.

Skoncentrowałem się na nawiązywaniu połączenia i wykonywaniu transmisji z powiadomieniem o zapisie na urządzeniu BLE, które jest serializowane.

private PublishSubject<Pair<byte[], Integer>> inputSubject = PublishSubject.create(); 

private PublishSubject<Pair<byte[], Integer>> outputSubject = PublishSubject.create(); 

private Subscription connectionSubscription; 

private volatile int uniqueId = 0; // used to identify the transmission we're interested in in case more than one will be started at the same time 

public void connect() { 
    Observable<RxBleConnection> connectionObservable = // your establishing of the connection wether it will be through scan or RxBleDevice.establishConnection() 
    final UUID notificationUuid = // your notification characteristic UUID 
    final UUID writeUuid = // your write-only characteristic UUID 

    connectionSubscription = connectionObservable 
      .flatMap(
        rxBleConnection -> rxBleConnection.setupNotification(notificationUuid), // subscribing for notifications 
        (rxBleConnection, notificationObservable) -> // connection is established and notification prepared 
          inputSubject // waiting for the data-packet to transmit 
            .onBackpressureBuffer() 
            .flatMap(bytesAndFilter -> { 
               return Observable.combineLatest(// subscribe at the same time to 
                 notificationObservable.take(1), // getting the next notification bytes 
                 rxBleConnection.writeCharacteristic(writeUuid, bytesAndFilter.first), // transmitting the data bytes to the BLE device 
                 (responseBytes, writtenBytes) -> responseBytes // interested only in the response bytes 
               ) 
                 .doOnNext(responseBytes -> outputSubject.onNext(new Pair<>(responseBytes, bytesAndFilter.second))); // pass the bytes to the receiver with the identifier 
              }, 
              1 // serializing communication as only one Observable will be processed at the same time 
            ) 
      ) 
      .flatMap(observable -> observable) 
      .subscribe(
        response -> { /* ignored here - used only as side effect with outputSubject */ }, 
        throwable -> outputSubject.onError(throwable) 
      ); 
} 

public void disconnect() { 
    if (connectionSubscription != null && !connectionSubscription.isUnsubscribed()) { 
     connectionSubscription.unsubscribe(); 
     connectionSubscription = null; 
    } 
} 

public Observable<byte[]> writeData(byte[] data) { 
    return Observable.defer(() -> { 
       final int uniqueId = this.uniqueId++; // creating new uniqueId for identifying the response 
       inputSubject.onNext(new Pair<>(data, uniqueId)); // passing the data with the id to be processed by the connection flow in connect() 
       return outputSubject 
         .filter(responseIdPair -> responseIdPair.second == uniqueId) 
         .first() 
         .map(responseIdPair -> responseIdPair.first); 
      } 
    ); 
} 

Jest to podejście, które myślę, że jest dobry, jak cały przepływ jest opisany w jednym miejscu, a więc łatwiejsze do zrozumienia. Część komunikacji, która jest stanowa (żądanie zapisu i oczekiwanie na odpowiedź) jest serializowana i ma możliwość utrzymywania połączenia aż do wywołania disconnect().

Wadą jest to, że transmisja polega na efektach ubocznych o różnym przepływie i wywoływaniu writeData() przed ustanowieniem połączenia, a konfiguracja powiadomień nigdy nie zakończy zwracanego obiektu Observable, chociaż nie powinno być problemu z dodaniem obsługi dla tego scenariusza z kontrolą stanu.

Pozdrowienia

+0

Awesome, wypróbuję Twoje rozwiązanie i dam ci znać, jak to działa. Jest to bardzo doceniane, ponieważ jestem nowy w świecie Rx i wciąż się uczę! –

+3

Czy próbowałeś zaproponowanego rozwiązania? Czy działało zgodnie z przeznaczeniem? –