官术网_书友最值得收藏!

Understanding the Observable.create method

You can create your own Observable with the Observable.create method at any time. This method takes an instance of the ObservableEmitter<T> interface as a source to observe on. So, let's consider this following example:

    fun main(args: Array<String>) { 
 
      val observer: Observer<String> = object : Observer<String> { 
        override fun onComplete() { 
          println("All Completed") 
        } 
 
        override fun onNext(item: String) { 
          println("Next $item") 
        } 
 
        override fun onError(e: Throwable) { 
          println("Error Occured ${e.message}") 
        } 
 
        override fun onSubscribe(d: Disposable) { 
          println("New Subscription ") 
        } 
    }//Create Observer 
 
    val observable:Observable<String> = Observable.create<String> {//1 
      it.onNext("Emit 1") 
      it.onNext("Emit 2") 
      it.onNext("Emit 3") 
      it.onNext("Emit 4") 
      it.onComplete() 
    } 
 
    observable.subscribe(observer) 
 
    val observable2:Observable<String> = Observable.create<String> {//2 
      it.onNext("Emit 1") 
      it.onNext("Emit 2") 
      it.onNext("Emit 3") 
      it.onNext("Emit 4") 
      it.onError(Exception("My Custom Exception")) 
    } 
 
    observable2.subscribe(observer) 
   } 

First, we created an instance of the Observer interface as the previous example. I will not elaborate on observer, as we have already seen an overview in the previous example, and we will see it in detail later in this chapter.

On comment 1, we created Observable with the Observable.create method; we emitted four string from Observable with the help of the onNext method, and then notified it is complete with the onComplete method.

On comment 2, we did almost the same, except here instead of calling onComplete, we called onError with a custom Exception.

Here is the output of the program:

The Observable.create method is useful, especially when you are working with a custom data structure and want to have control over what values are getting emitted. You can also emit values to Observer from a different thread.

Note that the Observable contract (http://reactivex.io/documentation/contract.html) states that Observable must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens—before relationship between the notifications.

主站蜘蛛池模板: 二连浩特市| 离岛区| 榆中县| 抚松县| 伊金霍洛旗| 天津市| 绥中县| 溆浦县| 建昌县| 台江县| 乌兰察布市| 迭部县| 宁德市| 富平县| 鹰潭市| 南川市| 林芝县| 高雄市| 扎鲁特旗| 济源市| 贵定县| 漳平市| 兰考县| 潮州市| 乐业县| 轮台县| 息烽县| 新津县| 浦北县| 苏尼特右旗| 洞头县| 赣州市| 鲜城| 长丰县| 密云县| 什邡市| 搜索| 恩平市| 三明市| 呼和浩特市| 皮山县|