RxJS는 Reactive Extensions For JavaScript 의 약자로 reactive programming을 JS에서 지원하기 위해 만들어진 라이브러리다.

reactive programming: 데이터의 흐름과 변화에 반응하는 프로그래밍 패러다임이다. 
이 패러다임은 데이터의 변화를 이벤트로 간주하고, 이러한 이벤트에 대한 반응을 정의함으로써 시스템을 구축한다. 
특히 비동기적인 데이터 스트림을 처리하는 데 유용하며, 주로 사용자 인터페이스, 실시간 데이터 처리, 비동기 IO 작업 등에 적용된다.

이 리액티브 프로그래밍은 Push 시나리오 방식으로 외부와 통신한다.

Push 시나리오: 외부에서 명령하면서 응답이 오면 그때 반응하여 처리한다. 
    데이터를 가지고 오기 위해서는 subscribe를 해야한다.

RxJS는 이러한 비동기 이벤트 기반의 프로그램 작성을 돕기 위해 함수형 프로그래밍을 이용해 이벤트 스트림을 Observable이라는 객체로 표현한다.


Observable은 event가 흐르는 stream이다. Observable은 누군가 구독(subscribe)을 해야 event를 발행(publish)한다.
Observer가 Observable을 구독하면서 next, error, complete 키워드를 사용해 Observable에 흐르는 event를 처리한다.

    const observable$ = interval(1000).pipe(take(4));

Observable 변수에 붙은 $(달러) 표시는 Observable을 나타내는 코드 컨벤션이다.
interval()은 정의된 시간마다 증가하는 연속값을 stream에 발생시키고,
pipe() operator를 사용하여 Observable stream 내부에서 적용할 operator를 처리하게 된다.
take는 발생시킨 이벤트 중 처음부터 n개까지의 이벤트만 받는다.

ReactX는 크게 세 요소로 구성된다.

  1. Observable:
    일련의 값들을 발행한다. 관찰될 수 있는 것, 관찰되는 대상이란 뜻. 아래의 코드에서는 1에서 20까지의 정수를 반환한다.
    이렇게 연속적으로 발행되어 나오는 값들을 흐름, stream이라고 부른다.
  2. Operator:
    순수 함수들의 모임
  3. Observer:
    파이프만 쳐다보며 값을 기다리다가 뭔가 나오는 대로 최종 작업을 시행한다.
    옵저버가 파이프를 주시하며 발행물을 기다리는 것을 ReactiveX에서는 subscribe, 구독한다고 표현한다.
    구독자가 발행물들에 '반응'하는 것 이다.
const {range} = rxjs
cosnt {filter, take, map, toArray} = rxjs.operators

range(1,20)
    .pipe(
        filter(n => n % 2 === 0),
        take(5),
        map(n => Math.pow(n,2)),
        toArray(),
        map(arr => arr.join(', '))
    )
    .subscribe(console.log)
  1. Operator
    Operator는 Observable에서 각 이벤트들에 대해 연산을 할 수 있는 pure function이다.
    앞서 언급한 것처럼, RxJS는 함수형 프로그래밍에 영향을 많이 받아 이러한 pure function들이 많이 존재한다.
    대표적으로 tap(), filter(), min(), max()와 같은 operator가 존재한다.

    • tab(): 데이터 스트림의 요소에 대해 부수적인 작업을 수행할 때 사용된다. 스트림의 요소를 변경하지 않고, 디버깅이나 로깅 등에 유용하다.

      import { of } from 'rxjs';
      import { tap } from 'rxjs/operators';
      
      // of : RxJS에서 제공하는 함수로, 주어진 인수들을 순차적으로 방출하는 Observable을 생성한다.
      of(1, 2, 3).pipe(
      tap(x => console.log(`Tapped: ${x}`))
      ).subscribe();
    • filter(): 조건을 만족하는 요소만 스트림에 남겨둔다. 스트림의 각 요소에 대해 조건을 평가하여 true인 경우만 방출

      import { of } from 'rxjs';
      import { filter } from 'rxjs/operators';
      
      of(1, 2, 3, 4).pipe(
        filter(x => x % 2 === 0)
      ).subscribe(console.log); // 2, 4
    • min(): 스트림에서 최소값을 찾는다. 스트림의 모든 요소를 비교하여 가장 작은 값을 방출

       import { of } from 'rxjs';
       import { min } from 'rxjs/operators';
      
       of(5, 3, 9, 1).pipe(
         min()
       ).subscribe(console.log); // 1
    • max(): 스트림에서 최대값을 찾는다. 스트림의 모든 요소를 비교하여 가장 큰 값을 방출

       import { of } from 'rxjs';
       import { max } from 'rxjs/operators';
    
       of(5, 3, 9, 1).pipe(
         max()
       ).subscribe(console.log); // 9
  2. Observer
    Observer는 Observable을 구독하는 대상이다. Observer를 정의하고
    next, error, complete 세 가지를 정의해 주고 Observable에 구독을 하면 완성이다.

    • next: Observable에 들어오는 event를 처리한다.

    • error: Observable에서 error가 발생했을 때 event를 처리해 준다.

    • complete: Observable이 종료되면 complete가 호출되게 된다.

      예1

        const observable$ = interval(1000).pipe(take(4));
      
        const observer = {
            next: (item: number) => console.log(item),
            error: (error: Error) => console.log(err),
              complete: () => console.log('complete'),
        };
      
        observable$.subscribe(observer);

      예2

        import {fromEvent} from 'rxjs'
      
        // fromEvent를 이용해서 이벤트를 observable 객체로 만든다.
        // dom 요소와 click과 같은 이벤트
        const observable = fromEvent(document, 'click');
        // subscriber(이벤트핸들러)를 정의
        const subscriber = () => console.log('Clicked!');
        // observable가 subscribe를 사용해 이벤트 핸들러인 subscriber을 구독하도록 한다.
        observable.subscribe(subscriber)
    1. Observable
      Observable 객체는 특정 객체를 관찰하는 이벤트 핸들러인 Subscriber에게 여러 이벤트나 값을 보내는 역할을 한다.
아래의 코드는 RxJS의 Observable을 직접 생성하고 구독하는 예제이다.
observable이 subscriber에게 연속으로 1,2,3을 방출하고 1초 후에 4를 방출하도록 되어있다.  

```javascript
    const observable = new Observable(subscriber => {
    subscriber.next(1)  
    subscriber.next(2)
    subscriber.next(3)
    setTimeout(() => { 
      subscriber.next(4)
      subscriber.complete() // 스트림을 완료한다. 
    }, 1000)
  })

   // 1.
  console.log('just before subscribe')
  observable.subscribe({
    next(x) { // 새로운 값이 방출될 때 마다 호출된다.
      // 2.
      console.log('got value ' + x)
    },
    error(err) { 
      console.error('something wrong occurred: ' + err)
    },
    complete() { // 스트림이 완료될 때 호출된다.
      // 4
      console.log('done')
    },
  })
    // 3
  console.log('just after subscribe')
```

출처

https://www.youtube.com/watch?v=KDiE5qQ3bZI
https://velog.io/@stop7089/NestJS-%EC%99%80-RxJS

'NestJS > 모르는 것들' 카테고리의 다른 글

Guard, Strategy, Passport, Jwt  (0) 2024.06.25

+ Recent posts