import { useContext } from '@marblejs/core';
import { r, HttpRequestBusToken, HttpStatus } from '@marblejs/http';
import { from, of } from 'rxjs';
import { bufferWhen, catchError, filter, map, mergeMap } from 'rxjs/operators';
const getFlush$ = (req$: Observable<HttpRequest>): Observable<HttpRequest> =>
filter(req => req.method === 'GET' && req.url === '/flush'),
r.applyMeta({ continuous: true }),
r.useEffect((req$, ctx) => {
const reqBus$ = useContext(HttpRequestBusToken)(ctx.ask);
const terminate$ = getFlush$(reqBus$);
bufferWhen(() => terminate$),
mergeMap(buffer => from(buffer)),
mergeMap(request => processData(request).pipe(
map(body => ({ body, request })),
status: HttpStatus.BAD_REQUEST,
body: { error: { message: error.message }}