2012-05-15 18 views
17

Ponieważ inicjalizacja usługi i portu klienta WS trwa wieki, chciałbym zainicjować je raz podczas uruchamiania i ponownie użyć tego samego wystąpienia portu. Initalization będzie wyglądać mniej więcej tak:Czy ten wątek połączenia klienta JAX-WS jest bezpieczny?

private static RequestContext requestContext = null; 

static 
{ 
    MyService service = new MyService(); 
    MyPort myPort = service.getMyServicePort(); 

    Map<String, Object> requestContextMap = ((BindingProvider) myPort).getRequestContext(); 
    requestContextMap = ((BindingProvider)myPort).getRequestContext(); 
    requestContextMap.put(BindingProvider.USERNAME_PROPERTY, uName); 
    requestContextMap.put(BindingProvider.PASSWORD_PROPERTY, pWord); 

    rc = new RequestContext(); 
    rc.setApplication("test"); 
    rc.setUserId("test"); 
} 

Wezwanie gdzieś w mojej klasie:

myPort.someFunctionCall(requestContext, "someValue"); 

Moje pytanie: Czy to wezwanie być bezpieczny wątku?

Jonny

+3

już tutaj odpowiedzi: http://stackoverflow.com/questions/4385204/are-jax-ws-clients-thread-safe – kyiu

+0

Hi KHY , Dziękuję za szybką odpowiedź. Widziałem ten wątek. Mój problem polega na tym, że brakuje mi jakiegokolwiek (oficjalnego) stwierdzenia, co jest wątków bezpieczne, czy nie (usługa/port/itp.). Moja przypadek użycia różni się także od drugiego wątku. Jonny – user871611

+1

Oto odpowiedź znaleziona na stronie CXF: https://cwiki.apache.org/CXF/faq.html#FAQ-AreJAXWSclientproxiesthreadsafe%253F – kyiu

Odpowiedz

19

Według CXF FAQ:

Czy JAX-WS proxy klienta wątek bezpieczne?

Dziennik JAX-WS odpowiedź: Nr Według specyfikacji JAX-WS, pełnomocnicy klienckie NIE wątku bezpieczne. Aby pisać kod przenośny, należy traktować je jako bezpieczne i bezpieczne, a także używać puli instancji lub podobnych.

Odpowiedź CXF: Serwery proxy CXF są bezpieczne dla wielu zastosowań. W wyjątkami są:

  • Zastosowanie ((BindingProvider)proxy).getRequestContext() - za JAX-WS specyfikacji, kontekst żądanie jest w danym przypadku. W związku z tym wszystko, co tam zostanie ustawione, wpłynie na żądania innych wątków. Z CXF, można zrobić:

    ((BindingProvider)proxy).getRequestContext().put("thread.local.request.context","true"); 
    

    i przyszłych połączeń do getRequestContext() będzie używać nici lokalny kontekst żądania. Dzięki temu kontekst żądania może być bezpieczny dla wątków . (Uwaga: kontekst odpowiedź jest zawsze wątku lokalny CXF)

  • Ustawienia na przewodzie - jeśli użyć kodu lub konfiguracji bezpośrednio manipulować przewód (jak ustawić ustawienia TLS lub podobny), ci nie są wątku bezpieczny. Kanał jest na wystąpienie i dlatego te ustawienia będą wspólne. Ponadto, jeśli użyjesz funkcji FailoverFeature i LoadBalanceFeatures, kanał zostanie zastąpiony w locie. W związku z tym ustawienia ustawione w kanale mogą zostać utracone przed użyciem w gnieździe ustalającym .

  • Obsługa sesji - jeśli włączysz obsługę sesji (patrz spec), plik cookie sesji jest przechowywany w kanale. W związku z tym wpadłby w powyższe zasady dotyczące ustawień przewodów, a tym samym byłby wspólny dla wszystkich wątków.
  • WS-Security tokens - Jeśli używasz WS-SecureConversation lub WS-Trust, pobrany token jest buforowany w Endpoint/Proxy, aby uniknąć dodatkowych (i drogich) połączeń do STS w celu uzyskania tokenów. Tak więc, wiele wątków będzie udostępniać token. Jeśli każdy wątek ma inne poświadczenia bezpieczeństwa lub wymagania, muszą być używane osobne instancje proxy .

W przypadku problemów z przewodami, MOŻESZ zainstalować nowy ConduitSelector , który używa wątku lokalnego lub podobnego. Jest to jednak kompleks złożony z .

Dla większości "prostych" przypadków użycia można używać proxy CXF na wielu wątkach . Powyższe opisuje obejścia dla innych.

3

Ogólnie rzecz biorąc nie.

Według CXF FAQ http://cxf.apache.org/faq.html#FAQ-AreJAX-WSclientproxiesthreadsafe?

Dziennik JAX-WS odpowiedź: Nr Według specyfikacji JAX-WS, klient proxy NIE są bezpieczne dla wątków. Aby napisać kod przenośny, należy traktować je jako bezpieczny i bezpieczny, a także korzystać z puli instancji lub podobnych.

Odpowiedź CXF: Serwery proxy CXF są bezpieczne dla wielu zastosowań.

Lista wyjątków znajduje się w FAQ.

3

Jak widać z powyższych odpowiedzi, że serwery proxy JAX-WS nie są bezpieczne dla wątków, chciałem tylko udostępnić moją implementację innym, aby buforować proxy klientów. Rzeczywiście stanąłem w obliczu tego samego problemu i postanowiłem utworzyć komponent sprężynowy, który wykonuje buforowanie proxy klienta JAX-WS. Można zobaczyć więcej szczegółów http://programtalk.com/java/using-spring-and-scheduler-to-store/

import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

import javax.annotation.PostConstruct; 

import org.apache.commons.lang3.concurrent.BasicThreadFactory; 
import org.apache.logging.log4j.Logger; 
import org.springframework.stereotype.Component; 

/** 
* This keeps the cache of MAX_CUNCURRENT_THREADS number of 
* appConnections and tries to shares them equally amongst the threads. All the 
* connections are created right at the start and if an error occurs then the 
* cache is created again. 
* 
*/ 
/* 
* 
* Are JAX-WS client proxies thread safe? <br/> According to the JAX-WS spec, 
* the client proxies are NOT thread safe. To write portable code, you should 
* treat them as non-thread safe and synchronize access or use a pool of 
* instances or similar. 
* 
*/ 
@Component 
public class AppConnectionCache { 

private static final Logger logger = org.apache.logging.log4j.LogManager.getLogger(AppConnectionCache.class); 

private final Map<Integer, MyService> connectionCache = new ConcurrentHashMap<Integer, MyService>(); 

private int cachedConnectionId = 1; 

private static final int MAX_CUNCURRENT_THREADS = 20; 

private ScheduledExecutorService scheduler; 

private boolean forceRecaching = true; // first time cache 

@PostConstruct 
public void init() { 
    logger.info("starting appConnectionCache"); 
    logger.info("start caching connections"); ;; 
    BasicThreadFactory factory = new BasicThreadFactory.Builder() 
    .namingPattern("appconnectioncache-scheduler-thread-%d").build(); 
    scheduler = Executors.newScheduledThreadPool(1, factory); 

    scheduler.scheduleAtFixedRate(new Runnable() { 
    @Override 
    public void run() { 
    initializeCache(); 
    } 

    }, 0, 10, TimeUnit.MINUTES); 

} 

public void destroy() { 
    scheduler.shutdownNow(); 
} 

private void initializeCache() { 
    if (!forceRecaching) { 
    return; 
    } 
    try { 
    loadCache(); 
    forceRecaching = false; // this flag is used for initializing 
    logger.info("connections creation finished successfully!"); 
    } catch (MyAppException e) { 
    logger.error("error while initializing the cache"); 
    } 
} 

private void loadCache() throws MyAppException { 
    logger.info("create and cache appservice connections"); 
    for (int i = 0; i < MAX_CUNCURRENT_THREADS; i++) { 
    tryConnect(i, true); 
    } 
} 

public MyPort getMyPort() throws MyAppException { 
    if (cachedConnectionId++ == MAX_CUNCURRENT_THREADS) { 
    cachedConnectionId = 1; 
    } 
    return tryConnect(cachedConnectionId, forceRecaching); 
} 

private MyPort tryConnect(int threadNum, boolean forceConnect) throws MyAppException { 
    boolean connect = true; 
    int tryNum = 0; 
    MyPort app = null; 
    while (connect && !Thread.currentThread().isInterrupted()) { 
    try { 
    app = doConnect(threadNum, forceConnect); 
    connect = false; 
    } catch (Exception e) { 
    tryNum = tryReconnect(tryNum, e); 
    } 
    } 
    return app; 
} 

private int tryReconnect(int tryNum, Exception e) throws MyAppException { 
    logger.warn(Thread.currentThread().getName() + " appservice service not available! : " + e); 
    // try 10 times, if 
    if (tryNum++ < 10) { 
    try { 
    logger.warn(Thread.currentThread().getName() + " wait 1 second"); 
    Thread.sleep(1000); 
    } catch (InterruptedException f) { 
    // restore interrupt 
    Thread.currentThread().interrupt(); 
    } 
    } else { 
    logger.warn(" appservice could not connect, number of times tried: " + (tryNum - 1)); 
    this.forceRecaching = true; 
    throw new MyAppException(e); 
    } 
    logger.info(" try reconnect number: " + tryNum); 
    return tryNum; 
} 

private MyPort doConnect(int threadNum, boolean forceConnect) throws InterruptedException { 
    MyService service = connectionCache.get(threadNum); 
    if (service == null || forceConnect) { 
    logger.info("app service connects : " + (threadNum + 1)); 
    service = new MyService(); 
    connectionCache.put(threadNum, service); 
    logger.info("connect done for " + (threadNum + 1)); 
    } 
    return service.getAppPort(); 
} 
} 
0

Ogólne rozwiązanie tego problemu jest użycie wielu obiektów klienta w basenie, a następnie korzystać z serwera proxy, który działa jako fasady.

import org.apache.commons.pool2.BasePooledObjectFactory; 
import org.apache.commons.pool2.PooledObject; 
import org.apache.commons.pool2.impl.DefaultPooledObject; 
import org.apache.commons.pool2.impl.GenericObjectPool; 

import java.lang.reflect.InvocationHandler; 
import java.lang.reflect.Method; 
import java.lang.reflect.Proxy; 

class ServiceObjectPool<T> extends GenericObjectPool<T> { 
     public ServiceObjectPool(java.util.function.Supplier<T> factory) { 
      super(new BasePooledObjectFactory<T>() { 
       @Override 
       public T create() throws Exception { 
        return factory.get(); 
       } 
      @Override 
      public PooledObject<T> wrap(T obj) { 
       return new DefaultPooledObject<>(obj); 
      } 
     }); 
    } 

    public static class PooledServiceProxy<T> implements InvocationHandler { 
     private ServiceObjectPool<T> pool; 

     public PooledServiceProxy(ServiceObjectPool<T> pool) { 
      this.pool = pool; 
     } 


     @Override 
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 
      T t = null; 
      try { 
       t = this.pool.borrowObject(); 
       return method.invoke(t, args); 
      } finally { 
       if (t != null) 
        this.pool.returnObject(t); 
      } 
     } 
    } 

    @SuppressWarnings("unchecked") 
    public T getProxy(Class<? super T> interfaceType) { 
     PooledServiceProxy<T> handler = new PooledServiceProxy<>(this); 
     return (T) Proxy.newProxyInstance(interfaceType.getClassLoader(), 
              new Class<?>[]{interfaceType}, handler); 
    } 
} 

Aby korzystać z serwera proxy:

ServiceObjectPool<SomeNonThreadSafeService> servicePool = new ServiceObjectPool<>(createSomeNonThreadSafeService); 
nowSafeService = servicePool .getProxy(SomeNonThreadSafeService.class);