
    (h:                        d dl m Z mZ d dlmZmZmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlZd dl m Z  d dlmZmZ d d	lmZmZmZ d d
lmZ d dlmZmZ d dlmZ d dlmZmZmZm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z&m'Z'm(Z(m)Z) d dl*m+Z+ d dl,m-Z-m.Z.m/Z/ d dl0m1Z1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z>m?Z?m@Z@ d dlAmBZB d dlCZCd dlDZDd dlEZEd dlFmGZG dedeeHeIf   fdZJdedeeH   d eHdee    fd!ZKd"eHd#edee   fd$ZLd%ee    d&eeHef   ddfd'ZMd%ee    dee    fd(ZNd)ee    dee    fd*ZOe'fd+eeH   d%ee    d,eHd-eHd.eeH   ddfd/ZPy)0    )datetimetimezone)DictListOptionalTuple)UUID)ThreadPoolExecutor)DecimalN)r   )HTTPExceptionstatus)and_funcor_)IntegrityError)Sessionaliased)	nullslast)RollupLogCreateRollupLogUpdateRollupLogResponseRollupOutput)settings)APIException)API_PREFIXESEXPORT_DATA_SUB_FOLDERRP_HEADER_LISTsize_conversions)QueryPaginator)
RollupTypeRollupStatus	Separator)create_rollup_logupdate_rollup_log)create_rollup_output)	get_rates)get_vintage_map)get_bottle_size_map)get_retailers_map) get_codes_from_web_crawler_filesget_web_crawler_by_codeget_latest_file_content)get_mongo_id_by_wine_id)Pathdbreturnc                   K   	 t        | t        t        j                  t	        j
                  t        j                        t        j                               d{   }t        |        d{   }t        |        d{   }t        |        d{   }t        | d       d{   }t        |        d{   }g }t        |      xs d}d}	|D ]  }
t!        | |j"                  t%        d|	 d| 	      
       d{   }t'        | |
       d{   }|rwt(        j*                  }|j-                  |
j/                               }t1        | |j"                  d       d{   }|j3                  t5        | ||||||       d{          |	dz  }	 t7        ||       d{    t9        |       d{   }|j;                  d        t	        j
                         }d|j=                  d       d}t?        ||t@        t(        jB                  jD                  tF               d{    t!        | |j"                  t%        t        jH                  t	        j
                  t        j                        d|      
       d{   }tK        jL                  |      S 7 O7 ?7 /7 7 7 7 7 7 X7 77 7 7 7 ?# tN        $ r}t!        | j"                  t%        t        jP                  t	        j
                  t        j                        tS        |            
       d{  7   tU        ddtS        |      itV        jX                  d       Y d}~yd}~ww xY ww)a<  
    Execute the RP rollup process based on the provided payload.

    Args:
        db (Session): SQLAlchemy session for database interaction.
        payload (RollupOutput): The input data for the rollup process.

    Returns:
        Dict[str, any]: A dictionary containing the results of the rollup process.
    )rollup_type
date_startr   )r/   payloadNHISTORY)r/   typer   zProcessing /)message)r/   rollup_log_idr4   )r/   code)r/   web_crawler_idr6      c                 "    | j                   xs dS )N )wine_alert_id)xs    \/var/www/html/wine-match-dev/backend/winematch-backend/src/apps/rollup/services/rp_rollup.py<lambda>z#execute_rp_rollup.<locals>.<lambda>}   s    !//*?R     )keyForSalez%y.%m.%d.%H%M%Sz.txt)	file_namerollup_output_list
header_row	separatordownload_location)r   date_endr8   rF   )r   rK   r8   execute_rp_rollup	exceptionz"Error executing RP rollup process.)moduleerrorstatus_coder8   )-r#   r   r    RPr   nowr   utcr!   RUNNINGr'   r(   r)   r*   r&   lenr$   idr   r+   r"   PIPEgetlowerr,   extendprocess_rollup_outputsequalize_price"filter_by_median_price_per_wine_idsortstrftimecreate_rp_rollup_filer   TABvaluer   SUCCESSr   model_validate	ExceptionFAILEDstrr   r   HTTP_500_INTERNAL_SERVER_ERROR)r/   
new_rollupvintage_mapbottle_size_mapretailers_mapweb_crawler_codescurrency_ratesrG   total_recordsprocessing_countr:   updated_rollup_logcrawlerinput_delimiterretailerhistory_matchesfiltered_listrR   rF   finished_rollupes                        rA   rL   rL   ,   s2    u
 -#&MM#<<5#++
 

 ,B// 3B 77/33 #Cby"YY  )},-.3! %  	"D (9(mm')*:);1]OL( " 4rEEG "+..(,,TZZ\:(?2V]V`V`gp(q"q"))0'' #' 
 !A 	"D /@@@@ASTT 	@A lln cll+<=>dC	 $,%mm))4
 	
 	
 !2$--##++!hll3#		!
 	
 !//@@
 073 Z -" F #r 	AT	
	
  
$--##**!hll3A
 	
 	
 	&A'==8		
 	

s<  M7AK J,K 0J/1K J2K J5K +J8,K >J;?AK J>K KAK ,K-#K K
K /K
0K KA3K 6K7AK KK +M7,K /K 2K 5K 8K ;K >K K K K 
K K K K 	M4AM/<L?=-M/*M7/M44M7ru   rs   c                 .   K    fdt        j                  fd|D          d{   }|D cg c]  }||	 }	}t        |	D 
ci c]  }
|
j                  |
j                  f|
 c}
j                               }|S 7 Vc c}w c c}
w w)a  
    Processes rollup outputs in parallel, removing nulls and duplicates.

    Args:
        history_matches (List[str]): List of history match lines.
        input_delimiter (str): Delimiter for splitting lines.
        crawler: Crawler object.
        retailer: Retailer object.
        vintage_map: Map of vintages.
        bottle_size_map: Map of bottle sizes.

    Returns:
        List[RollupOutput]: Processed rollup outputs.
    c                   K   	 | j                  dd      } | j                        }t        |d   d d        d {   }t        |	|       d {   S 7 7 # t        $ r}t        d|        Y d }~y d }~ww xY ww)N r>   r      )r/   wine_id)web_crawlerrt   rowrj   rk   mongo_idzError processing line: )replacesplitr-   r%   re   print)
liner   r   rx   rk   rr   r/   rs   rt   rj   s
       rA   process_linez,process_rollup_outputs.<locals>.process_line   s     	<<"-D**_-C4CF3BKPPH-#!' /!   Q  	+A3/0	sP   B9A  AA  AA  BA  A   	B)A<7B<BBc              3   .   K   | ]  } |        y w)N ).0r   r   s     rA   	<genexpr>z)process_rollup_outputs.<locals>.<genexpr>   s     $TD\$%7$Ts   N)asynciogatherlistactual_retailer_descriptionpricevalues)r/   ru   rs   rr   rt   rj   rk   resultsresultnew_listitemunique_listr   s   ` `````     @rA   r[   r[      s     . & NN$TO$TUUG &-C60BCHCYabQU994::FLbiiklK V Dbs2   )BB	BBBB B2B
Bbottle_sizer   c                    K   | r|y	 t        d      }|}| j                         } | t        v rt        |    |      }|j                  t        d            S # t        $ r}t        d|        Y d}~yd}~ww xY ww)z6
    Standardizes prices to a 750 ml bottle size.
    N7500.01zError standardizing price: )r   rY   r   quantizere   r   )r   r   standard_size	new_pricerx   s        rA   standardize_pricer      s      %-	!'') **(5e<I!!'&/22 +A3/0s/   A:AA A:	A7A2-A:2A77A:rG   rn   c                   K   | D ]  }|j                   	 |j                  xs d}|j                  |t        d            }t        t	        |j                               j                  t        d      d      |z  }t        |j                  |       d{   |_         y7 # t        $ r}t        d|        d|_        Y d}~d}~ww xY ww)za
    Equalizes the price of rollup outputs by converting to USD and standardizing to 750 ml.
    NUSDz1.0r   ROUND_CEILING)roundingz,Warning: Error equalizing price for output: )r   currencyrX   r   rg   r   r   r   equalized_pricere   r   )rG   rn   outputr   currency_rateprice_in_usdrx   s          rA   r\   r\      s      % .<<#.!??3e . 2 28WU^ L  's6<<'89BBFO,  C   " " 0AASASUa/b)b&. *c .DQCHI)-&&.sA   CA?B$B"	B$C"B$$	C-CCCCc                   K   g }i }| D ]/  }|j                   xs d}||vrg ||<   ||   j                  |       1 t        d      }|j                         D ]  \  }}g }t	        |      dk(  r7|D cg c]+  }|j
                  r|j                  r|j                  |k\  r|- }}nt        |       d{   }t	        |      dkD  so|j                  |        |S c c}w 7 -w)z
    Filters rollup outputs by median price per wine ID.
    
    Args:
        rollup_output_list (List[RollupOutput]): List of rollup outputs to filter.
        
    Returns:
        List[RollupOutput]: Filtered list of rollup outputs.
    r>   5.5r<   Nr   )	r}   appendr   itemsrU   listingr   filter_by_median_pricerZ   )rG   r   groupedr   r}   
low_filter	wine_listrv   s           rA   r]   r]     s     F G" &,,$"'!!GG%	& J%mmo )y>Q
 "+||(<(<AUAUYcAc M  #9"CCM}!MM-(!)$ M Ds$   A.C!00C C!1C2C!C!r   c                   K   g }t        d      }g }| D ]Y  }|j                  |j                  |k\  s |j                  s-|j                  t        t	        |j                                     [ t        |      dkD  r	 |j                          t        |      dz  }t        |      dz  dk7  r||   }n||   ||dz
     z   t        d      z  }|t        d      z  }|t        d      z  }| D cg c]:  }|j                  r*|j                   |j                  |k  r|j                  |k\  r|< }}|S | D cg c]+  }|j                  r|j                  r|j                  |k\  r|- }}|S c c}w # t        $ rU}	t        d|	        | D cg c]+  }|j                  r|j                  r|j                  |k\  r|- nc c}w }}Y d}	~	|S d}	~	ww xY wc c}w w)	z
    Filters a list of wines by median price logic.
    
    Args:
        wine_list (List[RollupOutput]): List of wines with the same wine_id.
        
    Returns:
        List[RollupOutput]: Filtered list based on median price logic.
    r   Nr      r<   24z)Warning: Error calculating median price: )	r   r   r   r   rg   rU   r^   re   r   )
r   rv   r   
price_listr   
mid_numbermedian	max_price	min_pricerx   s
             rA   r   r   =  s     M JJ B  ,  J.LLgc$*>*>&?@A	B :	OOZA-J:"a'#J/$Z0:j1n3MMQXY\Q]]
 -I-I "+))5))Y6))Y6	 M * 	 '
<<D$8$8T=Q=QU_=_ 
 

 +  	=aSAB "+||(<(<AUAUYcAc  M  	
sj   !GGG=G?A/E. .?E)-E. /G50G%G)E. .	G7G	0F:9GGGGrH   rI   rF   rJ   c                 b  K   	 d}g }|j                  |j                  |       |z          |D ]4  }|j                  |j                  |t        j                        |z          6 t
        j                  j                  ||      }t        j                  t
        j                  j                  |      d       t        j                  |dd      4 d{   }	|	j                  dj                  |             d{    ddd      d{    t        |      j                  t        j                  t        j                   z  t        j"                  z  t        j$                  z         y7 7 r7 d# 1 d{  7  sw Y   txY w# t&        $ r}
t)        d	| d
|
        Y d}
~
yd}
~
ww xY ww)a  
    Creates an RP rollup file with the given data and sets file permissions.

    Args:
        header_row (List[str]): The header row for the file.
        rollup_output_list (List[RollupOutput]): The list of rollup outputs to write.
        separator (str): The separator to use in the file.
        file_name (str): The name of the file to create.
        download_location (str): The directory where the file will be saved.

    Returns:
        None
    
T)exist_okwzutf-8)modeencodingNr>   zError creating RP rollup file z: )r   join	to_stringr    rQ   ospathmakedirsdirnameaiofilesopenwriter.   chmodstatS_IRUSRS_IWUSRS_IRGRPS_IROTHre   r   )rH   rG   rI   rF   rJ   line_terminatorsbr   	file_pathfilerx   s              rA   r`   r`   |  sY    (A 			)..,>? ) 	TFIIf&&y*--@?RS	T GGLL!2I>	
BGGOOI.> ==wG 	* 	*4**RWWR[)))	* 	* 	YdllT\\9DLLH4<<WX		*)	* 	* 	* 	*
  A.ykA3?@@As   F/CF E-F $E3<E/=E3F E1AF ,F/-F /E31F 3F9E<:FF 	F,F'"F/'F,,F/)Qr   r   typingr   r   r   r   uuidr	   concurrent.futuresr
   decimalr   r   fastapir   r   
sqlalchemyr   r   r   sqlalchemy.excr   sqlalchemy.ormr   r   sqlalchemy.sql.expressionr   src.apps.rollup.schemas.rollupr   r   r   r   src.core.configr   src.core.exceptionsr   src.utils.constantsr   r   r   r   src.utils.paginationr   src.utils.enumsr    r!   r"   #src.apps.rollup.services.rollup_logr#   r$   &src.apps.rollup.services.rollup_outputr%   #src.apps.currency.services.currencyr&   &src.apps.wine.vintage.services.vintager'   .src.apps.wine.bottle_size.services.bottle_sizer(   (src.apps.wine.retailer.services.retailerr)   )src.apps.web_crawler.services.web_crawlerr*   r+   r,    src.apps.wine.wine.services.winer-   r   r   r   pathlibr.   rg   anyrL   r[   r   r\   r]   r   r`   r   rC   rA   <module>r      s   ' . .  1    ) & & ) + /  % , f f / ? ? H 9 B N F 
 E  	  B
B
	#s(^B
H00#Y0 0 
,0f W 'AR ,.T,-? .QUVY[bVbQc .hl .0)lAS )X\]iXj )X<D,> <4CU <H (>+AS	+A\*+A +A 	+A
  }+A 
+ArC   