Merge branch 'release/2.1.0' into develop
[oweals/peertube.git] / client / src / app / shared / user-subscription / user-subscription.service.ts
1 import { bufferTime, catchError, filter, map, observeOn, share, switchMap, tap } from 'rxjs/operators'
2 import { asyncScheduler, merge, Observable, of, ReplaySubject, Subject } from 'rxjs'
3 import { HttpClient, HttpParams } from '@angular/common/http'
4 import { Injectable, NgZone } from '@angular/core'
5 import { ResultList } from '../../../../../shared'
6 import { environment } from '../../../environments/environment'
7 import { RestExtractor, RestService } from '../rest'
8 import { VideoChannel } from '@app/shared/video-channel/video-channel.model'
9 import { VideoChannelService } from '@app/shared/video-channel/video-channel.service'
10 import { VideoChannel as VideoChannelServer } from '../../../../../shared/models/videos'
11 import { ComponentPaginationLight } from '@app/shared/rest/component-pagination.model'
12 import { uniq } from 'lodash-es'
13 import * as debug from 'debug'
14 import { enterZone, leaveZone } from '@app/shared/rxjs/zone'
15
16 const logger = debug('peertube:subscriptions:UserSubscriptionService')
17
18 type SubscriptionExistResult = { [ uri: string ]: boolean }
19 type SubscriptionExistResultObservable = { [ uri: string ]: Observable<boolean> }
20
21 @Injectable()
22 export class UserSubscriptionService {
23   static BASE_USER_SUBSCRIPTIONS_URL = environment.apiUrl + '/api/v1/users/me/subscriptions'
24
25   // Use a replay subject because we "next" a value before subscribing
26   private existsSubject = new ReplaySubject<string>(1)
27   private readonly existsObservable: Observable<SubscriptionExistResult>
28
29   private myAccountSubscriptionCache: SubscriptionExistResult = {}
30   private myAccountSubscriptionCacheObservable: SubscriptionExistResultObservable = {}
31   private myAccountSubscriptionCacheSubject = new Subject<SubscriptionExistResult>()
32
33   constructor (
34     private authHttp: HttpClient,
35     private restExtractor: RestExtractor,
36     private restService: RestService,
37     private ngZone: NgZone
38   ) {
39     this.existsObservable = merge(
40       this.existsSubject.pipe(
41         // We leave Angular zone so Protractor does not get stuck
42         bufferTime(500, leaveZone(this.ngZone, asyncScheduler)),
43         filter(uris => uris.length !== 0),
44         map(uris => uniq(uris)),
45         observeOn(enterZone(this.ngZone, asyncScheduler)),
46         switchMap(uris => this.doSubscriptionsExist(uris)),
47         share()
48       ),
49
50       this.myAccountSubscriptionCacheSubject
51     )
52   }
53
54   /**
55    * Subscription part
56    */
57
58   deleteSubscription (nameWithHost: string) {
59     const url = UserSubscriptionService.BASE_USER_SUBSCRIPTIONS_URL + '/' + nameWithHost
60
61     return this.authHttp.delete(url)
62                .pipe(
63                  map(this.restExtractor.extractDataBool),
64                  tap(() => {
65                    this.myAccountSubscriptionCache[nameWithHost] = false
66
67                    this.myAccountSubscriptionCacheSubject.next(this.myAccountSubscriptionCache)
68                  }),
69                  catchError(err => this.restExtractor.handleError(err))
70                )
71   }
72
73   addSubscription (nameWithHost: string) {
74     const url = UserSubscriptionService.BASE_USER_SUBSCRIPTIONS_URL
75
76     const body = { uri: nameWithHost }
77     return this.authHttp.post(url, body)
78                .pipe(
79                  map(this.restExtractor.extractDataBool),
80                  tap(() => {
81                    this.myAccountSubscriptionCache[nameWithHost] = true
82
83                    this.myAccountSubscriptionCacheSubject.next(this.myAccountSubscriptionCache)
84                  }),
85                  catchError(err => this.restExtractor.handleError(err))
86                )
87   }
88
89   listSubscriptions (componentPagination: ComponentPaginationLight): Observable<ResultList<VideoChannel>> {
90     const url = UserSubscriptionService.BASE_USER_SUBSCRIPTIONS_URL
91
92     const pagination = this.restService.componentPaginationToRestPagination(componentPagination)
93
94     let params = new HttpParams()
95     params = this.restService.addRestGetParams(params, pagination)
96
97     return this.authHttp.get<ResultList<VideoChannelServer>>(url, { params })
98                .pipe(
99                  map(res => VideoChannelService.extractVideoChannels(res)),
100                  catchError(err => this.restExtractor.handleError(err))
101                )
102   }
103
104   /**
105    * SubscriptionExist part
106    */
107
108   listenToMyAccountSubscriptionCacheSubject () {
109     return this.myAccountSubscriptionCacheSubject.asObservable()
110   }
111
112   listenToSubscriptionCacheChange (nameWithHost: string) {
113     if (nameWithHost in this.myAccountSubscriptionCacheObservable) {
114       return this.myAccountSubscriptionCacheObservable[ nameWithHost ]
115     }
116
117     const obs = this.existsObservable
118                     .pipe(
119                       filter(existsResult => existsResult[ nameWithHost ] !== undefined),
120                       map(existsResult => existsResult[ nameWithHost ])
121                     )
122
123     this.myAccountSubscriptionCacheObservable[ nameWithHost ] = obs
124     return obs
125   }
126
127   doesSubscriptionExist (nameWithHost: string) {
128     logger('Running subscription check for %d.', nameWithHost)
129
130     if (nameWithHost in this.myAccountSubscriptionCache) {
131       logger('Found cache for %d.', nameWithHost)
132
133       return of(this.myAccountSubscriptionCache[ nameWithHost ])
134     }
135
136     this.existsSubject.next(nameWithHost)
137
138     logger('Fetching from network for %d.', nameWithHost)
139     return this.existsObservable.pipe(
140       filter(existsResult => existsResult[ nameWithHost ] !== undefined),
141       map(existsResult => existsResult[ nameWithHost ]),
142       tap(result => this.myAccountSubscriptionCache[ nameWithHost ] = result)
143     )
144   }
145
146   private doSubscriptionsExist (uris: string[]): Observable<SubscriptionExistResult> {
147     const url = UserSubscriptionService.BASE_USER_SUBSCRIPTIONS_URL + '/exist'
148     let params = new HttpParams()
149
150     params = this.restService.addObjectParams(params, { uris })
151
152     return this.authHttp.get<SubscriptionExistResult>(url, { params })
153                .pipe(
154                  tap(res => {
155                    this.myAccountSubscriptionCache = {
156                      ...this.myAccountSubscriptionCache,
157                      ...res
158                    }
159                  }),
160                  catchError(err => this.restExtractor.handleError(err))
161                )
162   }
163 }