
    >iA                    8   d Z ddlmZ ddlZddlmZmZmZ ddlmZ ddl	Z	ddl	m
Z
mZmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZ dZ dZ! G d d      Z" G d dejF                        Z# G d dejH                        Z$y)a  MongoDB transport module for kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
 *Unreviewed*

Transport Options
=================

* ``connect_timeout``,
* ``ssl``,
* ``ttl``,
* ``capped_queue_size``,
* ``default_hostname``,
* ``default_port``,
* ``default_database``,
* ``messages_collection``,
* ``routing_collection``,
* ``broadcast_collection``,
* ``queues_collection``,
* ``calc_queue_size``,
    )annotationsN)datetime	timedeltatimezone)Empty)MongoClienterrors
uri_parser)
CursorType)VersionMismatch)_detect_environment)bytes_to_str)dumpsloads)cached_property)maybe_sanitize_url   )virtualto_rabbitmq_queue_argumentsz3Kombu requires MongoDB version 1.3+ (server is {0})zKKombu requires MongoDB version 2.2+ (server is {0}) for TTL indexes supportc                  :    e Zd ZdZd Zd Zd Zd	dZd Zd Z	e	Z
y)
BroadcastCursorzCursor for broadcast queues.c                D    || _         d| _        | j                  d       y )Nr   F)rewind)_cursor_offsetpurge)selfcursors     s/var/www/html/wine-match-dev/backend/winematch-backend/venv/lib/python3.12/site-packages/kombu/transport/mongodb.py__init__zBroadcastCursor.__init__D   s    

%
     c                f    | j                   j                  j                  i       | j                  z
  S N)r   
collectioncount_documentsr   r   s    r    get_sizezBroadcastCursor.get_sizeI   s&    ||&&66r:T\\IIr"   c                8    | j                   j                          y r$   )r   closer'   s    r    r*   zBroadcastCursor.closeL   s    r"   c                    |r| j                   j                          | j                   j                  j                  i       | _        | j                   j                  | j                        | _         y r$   )r   r   r%   r&   r   skip)r   r   s     r    r   zBroadcastCursor.purgeO   sM    LL! ||..>>rB||((6r"   c                    | S r$    r'   s    r    __iter__zBroadcastCursor.__iter__W   s    r"   c                    	 	 t        | j                        }	 | xj                  dz  c_        |S # t        j                  j                  $ r(}dt        |      v r| j                          Y d }~n d }~ww xY w)Nznot valid at serverr   )nextr   pymongor	   OperationFailurestrr   r   )r   msgexcs      r    __next__zBroadcastCursor.__next__Z   si    4<<( 
 >>22  )CH4JJLs   0 A5A0/A00A5N)T)__name__
__module____qualname____doc__r!   r(   r*   r   r/   r7   r1   r.   r"   r    r   r   A   s+    &!
J7& Dr"   r   c                  l    e Zd ZdZdZi ZdZdZdZdZ	dZ
dZdZdZd	Zd
ZdZdZej&                  j(                  dz   Z fdZd Zd Z fdZd Zd Zd Zd Zd Z fdZd*dZd Z d Z!d*dZ"d Z#d Z$d Z%e&d        Z'e&d         Z(e&d!        Z)e&d"        Z*e&d#        Z+d$ Z,d% Z-d& Z.d' Z/d( Z0d) Z1 xZ2S )+ChannelzMongoDB Channel.TFNi z	127.0.0.1ii  kombu_defaultmessageszmessages.routingzmessages.broadcastzmessages.queues)connect_timeoutsslttlcapped_queue_sizedefault_hostnamedefault_portdefault_databasemessages_collectionrouting_collectionbroadcast_collectionqueues_collectioncalc_queue_sizec                J    t        |   |i | i | _        | j                   y r$   )superr!   _broadcast_cursorsclient)r   vargskwargs	__class__s      r    r!   zChannel.__init__   s&    %*6*"$ 	r"   c           
         | j                   r7| j                  j                  d|id||| j                  |d      did       y y )N_id$set	x-expires)rT   options	expire_atTupsert)rB   queues
update_one_get_queue_expire)r   queuerQ   s      r    
_new_queuezChannel._new_queue   sW    88KK""$#)%)%;%;"K&  #  r"   c                X   || j                   v r	 t        | j                  |            }n0| j                  j                  d|idt        j                  fg      }| j                  r| j                  |       |
t               t        t        |d               S # t        $ r d }Y Mw xY w)Nr^   priority)sortpayload)_fanout_queuesr1   _get_broadcast_cursorStopIterationr?   find_one_and_deleter2   	ASCENDINGrB   _update_queues_expirer   r   r   )r   r^   r5   s      r    _getzChannel._get   s    D'''455e<= --33% !7#4#456 4 C
 88&&u-;'M\#i.122 ! s   B B)(B)c                    | j                   st        | 	  |      S || j                  v r| j	                  |      j                         S | j                  j                  d|i      S Nr^   )rK   rM   _sizerd   re   r(   r?   r&   )r   r^   rR   s     r    rm   zChannel._size   s_     ##7=''D'''--e4==??}},,gu-=>>r"   c                   t        |      || j                  |d      d}| j                  r:| j                  |d      |d<   | j	                  |      }||d   ||d   k  r||d<   | j
                  j                  |       y )NT)reverse)rc   r^   ra   zx-message-ttlrX   )r   _get_message_priorityrB   r]   _get_message_expirer?   
insert_one)r   r^   messagerQ   data
msg_expires         r    _putzChannel._put   s    W~227D2I
 88 $ 6 6uo ND11':J%[!)Z${:K-K$.[!  &r"   c                R    | j                   j                  t        |      |d       y )N)rc   r^   )	broadcastrr   r   )r   exchangers   routing_keyrQ   s        r    _put_fanoutzChannel._put_fanout   s"    !!eGn+3#5 	6r"   c                    | j                  |      }|| j                  v r!| j                  |      j                          |S | j                  j                  d|i       |S rl   )rm   rd   re   r   r?   delete_many)r   r^   sizes      r    _purgezChannel._purge   sZ    zz% D'''&&u-335  MM%%w&67r"   c                    t        | j                  j                  |   d         }| j                  j	                  d|i      }|t        d |D              z  S )Ntablery   c              3  8   K   | ]  }|d    |d   |d   f  yw)rz   patternr^   Nr.   ).0rs     r    	<genexpr>z$Channel.get_table.<locals>.<genexpr>   s,      '
 }q|QwZ8'
s   )	frozensetstate	exchangesroutingfind)r   ry   localRoutesbrokerRoutess       r    	get_tablezChannel.get_table   s_    

 4 4X >w GH||(("
 Y '
!'
 
 
 	
r"   c                6   | j                  |      j                  dk(  r#| j                  ||||       || j                  |<   ||||d}|j	                         }| j
                  r| j                  |d      |d<   | j                  j                  |d|id       y )Nfanout)ry   r^   rz   r   rV   rX   rU   TrY   )	typeoftype_create_broadcast_cursorrd   copyrB   r]   r   r\   )r   ry   rz   r   r^   lookuprt   s          r    _queue_bindzChannel._queue_bind   s    ;;x %%1))+w7)1D& !&	
 {{}88 $ 6 6uk JDtDr"   c                |   | j                   j                  d|i       | j                  r| j                  j	                  d|i       t        |   |fi | || j                  v rH	 | j                  j                  |      }|j                          | j                  j                  |       y y # t        $ r Y y w xY w)Nr^   rT   )r   r}   rB   r[   
delete_onerM   queue_deleterd   rN   popr*   KeyError)r   r^   rQ   r   rR   s       r    r   zChannel.queue_delete  s      '5!1288KK""E5>2U-f-D'''/0044U; ##''. (  s   'B/ /	B;:B;c                   | j                   j                  }|j                  }|j                  d      rd}d|z   }|j                  |      s||z   }|t	        |      d  s|| j
                  z  }|j                  rPd|vrL|j                  d      \  }}|j                  }|j                  r|d|j                  z   z  }|dz   |z   dz   |z   }|j                  r|j                  n| j                  }t        j                  ||d      }|d	   xs |j                  }	|	d
v r| j                  }	d| j                  | j                   rt#        | j                   dz        nd d}
|
j%                  |d          i }|
j'                         D ]  \  }}t)        |t*              rt	        |      dk(  r|d   n|}|||<   |j-                         }||vs||   |k(  r|||<   R||   |k(  r[t/        j0                  d| d| d|j3                  |      d|d| d        |}
| j5                  |
      }
d|
v r|
j7                  d       ||	|
fS )Nzsrv://zmongodb+srv://zmongodb+@z://:F)validatedatabase)/NTi  )auto_start_requestrA   connectTimeoutMSrW   r   r   z,MongoDB transport: Option conflict for key 'z' and 'z' with different values: z vs z. Using value for 'z'.tlsrA   )
connectionrO   hostname
startswithlenrD   useridsplitpasswordportrE   r
   	parse_urivirtual_hostrF   rA   r@   intupdateitems
isinstancelistlowerwarningswarnget_prepare_client_optionsr   )r   schemerO   r   headtailcredentialsr   parseddbnamerW   
normalizedkvvallks                   r    
_parse_urizChannel._parse_uri  s[    ''??x(%F!H,H""6*(HF%---H==S0!.JD$ --KsV__44e|k1C7$>H$kkv{{t/@/@ %%huE
#:v':':[ **F #'88$($8$8 "%T%9%9D%@!A>B	
 	vi()
MMO 	DAq$Q-#a&A+!A$1CJqMB#z"~'<!$
2B3& B1#WRDPi!~~b),D7J1#RQ	" ..w7GKK((r"   c                    t         j                  dk\  rV|j                  dd        t        |j	                  d      t
              r%t         j                  j                  }||d      |d<   |S )N   r   readpreference)r2   version_tupler   r   r   r   read_preferences_MONGOS_MODES)r   rW   modess      r    r   zChannel._prepare_client_options_  s_      D(KK,d3'++&67=00>>,1':J2K,L()r"   c                    t        |fi |S r$   r   )r   	argumentsrQ   s      r    prepare_queue_argumentszChannel.prepare_queue_argumentsg  s    *9???r"   c                   | j                  |      \  }}}||d<   t               }|dk(  rddlm} |j	                          n|dk(  rddlm}  |        t        di |}||   }	|j                         d   }
|
j                  d	      d   }
t        t        t        |
j                  d
                  }|dk  rt        t        j                  |
            | j                   r#|dk  rt        t"        j                  |
            |	S )N)r   hostgeventr   )monkeyeventlet)monkey_patchversion-.)r   r   )   r   r.   )r   r   r   r   	patch_allr   r   r   server_infor   tuplemapr   r   E_SERVER_VERSIONformatrB   E_NO_TTL_INDEXES)r   r   r   r   confenvr   r   	mongoconnr   version_strr   s               r    _openzChannel._openj  s    !%!?&$V!#(?%J-N'$'	V$++-i8!'',Q/C!2!23!789V!"2"9"9+"FGGXX'F*!"2"9"9+"FGGr"   c                    | j                   |j                         v ry|j                  | j                   | j                  d       y)z0Create capped collection for broadcast messages.NT)r~   capped)rI   list_collection_namescreate_collectionrC   r   r   s     r    _create_broadcastzChannel._create_broadcast  sC    $$(F(F(HH""4#<#<(,(>(>*. 	# 	0r"   c                z   || j                      }|j                  g dd       || j                     j                  dg       || j                     }|j                  ddg       | j                  rJ|j                  dgd       |j                  dgd       || j
                     j                  dgd       y	y	)
zEnsure indexes on collections.)r^   r   )ra   r   )rT   r   T)
backgroundr   )ry   r   )rX   r   r   )expireAfterSecondsN)rG   create_indexrI   rH   rB   rJ   )r   r   r?   r   s       r    _ensure_indexeszChannel._ensure_indexes  s    D4457D 	 	
 	**+88,H4223lO<=88!!#3"4!K  "2!3 JT++,99!"q : :	 r"   c                j    | j                         }| j                  |       | j                  |       |S )zActually creates connection.)r   r   r   r   s     r    _create_clientzChannel._create_client  s.    ::<x(X&r"   c                "    | j                         S r$   )r   r'   s    r    rO   zChannel.client  s    ""$$r"   c                4    | j                   | j                     S r$   )rO   rG   r'   s    r    r?   zChannel.messages  s    {{43344r"   c                4    | j                   | j                     S r$   )rO   rH   r'   s    r    r   zChannel.routing  s    {{42233r"   c                4    | j                   | j                     S r$   )rO   rI   r'   s    r    rx   zChannel.broadcast  s    {{44455r"   c                4    | j                   | j                     S r$   )rO   rJ   r'   s    r    r[   zChannel.queues  s    {{41122r"   c                    	 | j                   |   S # t        $ r$ | j                  | j                  |   d d |      cY S w xY wr$   )rN   r   r   rd   )r   r^   s     r    re   zChannel._get_broadcast_cursor  sP    	**511 	 00##E*D$ 		s    *>>c                    t         j                  dk\  rd|it        j                  d}nd|idd} | j                  j
                  di |}t        |      x}| j                  |<   |S )Nr   r^   )filtercursor_typeT)querytailabler.   )r2   r   r   TAILABLErx   r   r   rN   )r   ry   rz   r   r^   r   r   rets           r    r   z Channel._create_broadcast_cursor  sv      E)"H-)22E "8, E
 %$$-u-/>v/FFd%%e,
r"   c                    |j                  di       j                  d      }|&| j                         t        t        |            z   S y )N
properties
expirationmilliseconds)r   get_nowr   r   )r   rs   values      r    rq   zChannel._get_message_expire  sB    L"-11,?<<>I3u:$FFF r"   c                    t        |t              r&| j                  j                  d|i      }|sy|d   }n|}	 |d   |   }| j                         t        |      z   S # t        t
        f$ r Y yw xY w)zGet expiration header named `argument` of queue definition.

        Note:
        ----
            `queue` must be either queue name or options itself.
        rT   NrW   r   r  )r   r4   r[   find_oner   	TypeErrorr  r   )r   r^   argumentdocrt   r  s         r    r]   zChannel._get_queue_expire  s     eS!++&&u~6Cy>DD	%h/E ||~	u === )$ 		s   A A10A1c                    | j                  |d      }|sy| j                  j                  d|idd|ii       | j                  j                  d|idd|ii       y)z,Update expiration field on queues documents.rV   Nr^   rU   rX   rT   )r]   r   update_manyr[   )r   r^   rX   s      r    ri   zChannel._update_queues_expire  sg    **5+>	  evY'?@	BENVk9%=>	@r"   c                H    t        j                  t        j                        S )zReturn current time in UTC.)r   nowr   utcr'   s    r    r  zChannel.get_now  s    ||HLL))r"   )
mongodb://)3r8   r9   r:   r;   supports_fanoutrd   rA   rB   r@   rC   rK   rD   rE   rF   rG   rH   rI   rJ   r   r=   from_transport_optionsr!   r_   rj   rm   rv   r{   r   r   r   r   r   r   r   r   r   r   r   r   rO   r?   r   rx   r[   re   r   rq   r]   ri   r  __classcell__)rR   s   @r    r=   r=   p   sO   O N C
COO"L&$+/)%ooDD H 3(	?'"6	
E(/(C)J@60:& % % 5 5 4 4 6 6 3 3	 G
>0
@*r"   r=   c                  N   e Zd ZdZeZdZdZej                  Zej                  j                  ej                  fz   Z
ej                  j                  ej                  ej                  fz   ZdZdZej                  j"                  j%                   eg d            Zd Zddd	Zy
)	TransportzMongoDB Transport.Tr   mongodbr2   )directtopicr   )exchange_typec                "    t         j                  S r$   )r2   r   r'   s    r    driver_versionzTransport.driver_version  s    r"   c                    |sy|r|S d|vrt        |      S |j                  dd      \  }}dj                  t        |      |g      S )Nr  ,r   )r   r   join)r   uriinclude_passwordmaskuri1	remainders         r    as_urizTransport.as_uri   sP    Jc>%c**))C+ixx+D19=>>r"   N)Fz**)r!  r4   returnr4   )r8   r9   r:   r;   r=   can_parse_urlpolling_intervalrE   r   r  connection_errorsr	   ConnectionFailurechannel_errorsr3   driver_typedriver_name
implementsextendr   r  r&  r.   r"   r    r  r    s    GM''L++v/G/G.II  	(($$##,% 	% 
 KK""--44 => 5 J
?r"   r  )%r;   
__future__r   r   r   r   r   r^   r   r2   r   r	   r
   pymongo.cursorr   kombu.exceptionsr   kombu.utils.compatr   kombu.utils.encodingr   kombu.utils.jsonr   r   kombu.utils.objectsr   kombu.utils.urlr    r   baser   r   r   r   r=   r  r.   r"   r    <module>r;     s   @ #  2 2   3 3 % , 2 - ) / .  -  
, ,^S*goo S*l$?!! $?r"   