Main Page   Namespace List   Class Hierarchy   Compound List   File List   Namespace Members   Compound Members   File Members  

ProxyPullSupplier.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPullSupplier.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPullSupplier.h"
00025 #include "EventChannel.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029 #include <assert.h>
00030 
00031 namespace OmniEvents {
00032 
00033 //
00034 //  ProxyPullSupplierManager
00035 //
00036 
00037 PortableServer::Servant ProxyPullSupplierManager::incarnate(
00038   const PortableServer::ObjectId& oid,
00039   PortableServer::POA_ptr         poa
00040 )
00041 {
00042   // Evict the oldest proxy servant, if we have reached the maximum number.
00043   if(_servants.size()>=_channel.maxNumProxies())
00044   {
00045     ProxyPullSupplier_i* oldest =NULL;
00046     unsigned long        age    =0;
00047     for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
00048         if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
00049         {
00050           oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
00051           age=oldest->timestamp();
00052         }
00053     DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
00054     try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
00055   }
00056   // Make a new servant.
00057   ProxyPullSupplier_i* result =new ProxyPullSupplier_i(_managedPoa,_queue);
00058   _servants.insert(result);
00059   return result;
00060 }
00061 
00062 ProxyPullSupplierManager::ProxyPullSupplierManager(
00063   const EventChannel_i&   channel,
00064   PortableServer::POA_ptr parentPoa,
00065   EventQueue&             q
00066 )
00067 : ProxyManager(parentPoa,"ProxyPullSupplier"),
00068   _queue(q),
00069   _channel(channel)
00070 {
00071   // pass
00072 }
00073 
00074 ProxyPullSupplierManager::~ProxyPullSupplierManager()
00075 {
00076   DB(20,"~ProxyPullSupplierManager()")
00077 }
00078 
00079 CosEventChannelAdmin::ProxyPullSupplier_ptr
00080 ProxyPullSupplierManager::createObject()
00081 {  
00082   return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>(
00083            _managedPoa.in(),
00084            CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
00085          );
00086 }
00087 
00088 void ProxyPullSupplierManager::disconnect()
00089 {
00090   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00091   {
00092     ProxyPullSupplier_i* narrowed =dynamic_cast<ProxyPullSupplier_i*>(*i);
00093     narrowed->disconnect_pull_supplier();
00094   }
00095 }
00096 
00097 
00098 //
00099 //  ProxyPullSupplier_i
00100 //
00101 
00102 // CORBA interface methods
00103 
00104 void ProxyPullSupplier_i::connect_pull_consumer(
00105   CosEventComm::PullConsumer_ptr pullConsumer
00106 )
00107 {
00108   if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00109       throw CosEventChannelAdmin::AlreadyConnected();
00110   touch();
00111   _connected=true;
00112   if(!CORBA::is_nil(pullConsumer))
00113       _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
00114 
00115   output(WriteLock().os);
00116 }
00117 
00118 void ProxyPullSupplier_i::disconnect_pull_supplier()
00119 {
00120   DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
00121   touch();
00122   eraseKey("ConsumerAdmin/ProxyPullSupplier");
00123   deactivateObject();
00124   if(!_connected)
00125   {
00126     throw CORBA::OBJECT_NOT_EXIST(
00127       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00128       CORBA::COMPLETED_NO
00129     );
00130   }
00131   else if(!CORBA::is_nil(_target))
00132   {
00133     CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
00134     req->send_deferred();
00135     Orb::inst().deferredRequest(req._retn());
00136   }
00137   _target=CosEventComm::PullConsumer::_nil();
00138 }
00139 
00140 CORBA::Any* ProxyPullSupplier_i::pull()
00141 {
00142   if(!_connected)
00143       throw CosEventComm::Disconnected();
00144   touch();
00145   if(moreEvents())
00146       return new CORBA::Any(*nextEvent());
00147   else
00148       throw CORBA::TRANSIENT(
00149         IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
00150         CORBA::COMPLETED_NO
00151       );
00152 }
00153 
00154 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
00155 {
00156   if(!_connected)
00157       throw CosEventComm::Disconnected();
00158   touch();
00159   if(moreEvents())
00160   {
00161     has_event=1;
00162     return new CORBA::Any(*nextEvent());
00163   }
00164   else
00165   {
00166     has_event=0;
00167     return new CORBA::Any();
00168   }
00169 }
00170 
00171 //
00172 
00173 ProxyPullSupplier_i::ProxyPullSupplier_i(
00174   PortableServer::POA_ptr poa,
00175   EventQueue& q
00176 )
00177 : Proxy(poa),
00178   EventQueue::Reader(q),
00179   _target(CosEventComm::PullConsumer::_nil()),
00180   _connected(false),
00181   _timestamp(0)
00182 {
00183   touch();
00184 }
00185 
00186 ProxyPullSupplier_i::~ProxyPullSupplier_i()
00187 {
00188   DB(20,"~ProxyPullSupplier_i()")
00189 }
00190 
00191 void ProxyPullSupplier_i::reincarnate(
00192   const string&      oid,
00193   const PersistNode& node
00194 )
00195 {
00196   CosEventComm::PullConsumer_var pullConsumer =
00197     string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
00198   // Do not activate until we know that we have read a valid target.
00199   activateObjectWithId(oid.c_str());
00200   connect_pull_consumer(pullConsumer.in());
00201 }
00202 
00203 void ProxyPullSupplier_i::output(ostream& os)
00204 {
00205   basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
00206 }
00207 
00208 inline void ProxyPullSupplier_i::touch()
00209 {
00210   unsigned long nsec; // dummy
00211   omni_thread::get_time(&_timestamp,&nsec);
00212 }
00213 
00214 }; // end namespace OmniEvents

Generated on Fri Nov 19 17:42:20 2004 for OmniEvents by doxygen1.2.15