2016-01-04 36 views
36

Próbuję wstępnie pobrać dane treningu, aby ukryć opóźnienie we/wy. Chciałbym napisać niestandardowy kod w języku Python, który ładuje dane z dysku i preprocesuje dane (np. Dodając okno kontekstowe). Innymi słowy, jeden wątek przetwarza dane, a drugi wykonuje szkolenie. Czy to możliwe w TensorFlow?Jak wstępnie pobrać dane przy użyciu niestandardowej funkcji python w tensorflow

Aktualizacja: Mam działający przykład oparty na przykładzie @ mrry.

import numpy as np 
import tensorflow as tf 
import threading 

BATCH_SIZE = 5 
TRAINING_ITERS = 4100 

feature_input = tf.placeholder(tf.float32, shape=[128]) 
label_input = tf.placeholder(tf.float32, shape=[128]) 

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]]) 
enqueue_op = q.enqueue([label_input, feature_input]) 

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE) 
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128]) 

sess = tf.Session() 

def load_and_enqueue(sess, enqueue_op, coord): 
    with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file: 
    while not coord.should_stop(): 
     feature_array = np.fromfile(feature_file, np.float32, 128) 
     if feature_array.shape[0] == 0: 
     print('reach end of file, reset using seek(0,0)') 
     feature_file.seek(0,0) 
     label_file.seek(0,0) 
     continue 
     label_value = np.fromfile(label_file, np.float32, 128) 

     sess.run(enqueue_op, feed_dict={feature_input: feature_array, 
             label_input: label_value}) 

coord = tf.train.Coordinator() 
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord)) 
t.start() 

for i in range(TRAINING_ITERS): 
    sum = sess.run(c) 
    print('train_iter='+str(i)) 
    print(sum) 

coord.request_stop() 
coord.join([t]) 
+3

Właśnie zrobiłem zeszyt o kolejkach, który wyjaśnia również podobny przypadek użycia, mam nadzieję, że może być przydatny również dla innych: https://gist.github.com/akiross/23b6ae42812841bb79af4976a2525cf9 – AkiRoss

Odpowiedz

49

Jest to częsty przypadek użycia, a większość implementacji użyć TensorFlow za kolejki oddzielenie kodu przebiegu wyprzedzającego z kodu treningowej. Jest a tutorial on how to use queues, jednak główne etapy są następujące:

  1. Definiowanie kolejkę q, które są buforowane do przetworzonych danych. TensorFlow obsługuje prosty tf.FIFOQueue, który produkuje elementy w kolejności ich kolejkowania, oraz bardziej zaawansowany tf.RandomShuffleQueue, który wytwarza elementy w losowej kolejności. Element queue jest krotką jednego lub więcej tensorów (które mogą mieć różne typy i kształty). Wszystkie kolejki obsługują operacje jednoelementowe (enqueue, dequeue) i wsadowe (enqueue_many, dequeue_many), ale w celu użycia operacji wsadowych należy określić kształty każdego tensora w elemencie kolejki podczas konstruowania kolejki.

  2. Zbuduj podgraph, który zapisuje wstępnie przetworzone elementy w kolejce. Jednym ze sposobów na to byłoby zdefiniowanie opcji dla tensorów odpowiadających jednemu przykładowi wejściowemu, a następnie przekazanie ich do q.enqueue(). (Jeśli twój preprocesing tworzy partię na raz, powinieneś zamiast tego użyć q.enqueue_many().) Możesz również dołączyć TensorFlow ops do tego podgraphu.

  3. Zbuduj podgraph, który wykonuje trening. Będzie to wyglądać jak zwykły wykres TensorFlow, ale otrzyma dane wejściowe przez wywołanie q.dequeue_many(BATCH_SIZE).

  4. Rozpocznij sesję.

  5. Utwórz jeden lub więcej wątków, które wykonują logikę przetwarzania wstępnego, a następnie wykonaj operację wstawiania, podając dane wstępnie przetworzone. Może się przydać użyteczna klasa narzędziowa tf.train.Coordinator i tf.train.QueueRunner.

  6. Uruchom wykres treningowy (optymalizator itp.) Tak jak zwykle.

EDIT: Oto prosty load_and_enqueue() funkcja i fragment kodu, aby zacząć:

# Features are length-100 vectors of floats 
feature_input = tf.placeholder(tf.float32, shape=[100]) 
# Labels are scalar integers. 
label_input = tf.placeholder(tf.int32, shape=[]) 

# Alternatively, could do: 
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100]) 
# label_batch_input = tf.placeholder(tf.int32, shape=[None]) 

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []]) 
enqueue_op = q.enqueue([feature_input, label_input]) 

# For batch input, do: 
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input]) 

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE) 
# Build rest of model taking label_batch, feature_batch as input. 
# [...] 
train_op = ... 

sess = tf.Session() 

def load_and_enqueue(): 
    with open(...) as feature_file, open(...) as label_file: 
    while True: 
     feature_array = numpy.fromfile(feature_file, numpy.float32, 100) 
     if not feature_array: 
     return 
     label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0] 

     sess.run(enqueue_op, feed_dict={feature_input: feature_array, 
             label_input: label_value}) 

# Start a thread to enqueue data asynchronously, and hide I/O latency. 
t = threading.Thread(target=load_and_enqueue) 
t.start() 

for _ in range(TRAINING_EPOCHS): 
    sess.run(train_op) 
+1

Dzięki za porady. Mam jeszcze pytanie. W moim eksperymencie funkcja szkolenia i etykieta są przechowywane w dwóch osobnych plikach binarnych. Czy powinienem zbudować dwie kolejki, jedną dla funkcji i jedną dla etykiety? Jeśli chcemy uzyskać losową parę (cechę, etykietę) z dwóch kolejek, w jaki sposób mogę się upewnić, że funkcja odpowiada poprawnej etykiecie? Innymi słowy, w jaki sposób mogę zagwarantować mapowanie jeden-do-jednego? –

+0

Aby zachować mapowanie jeden-do-jednego, powinieneś zbudować pojedynczą kolejkę, w której każdy element jest krotką tensora operacji i tensora etykiety. Można to zrobić, określając listę typów (i kształtów) konstruktora kolejki. Gwarantuje to, że komponenty tej samej krotki są zawsze odlepiane razem. – mrry

+0

Funkcje i etykiety są przechowywane oddzielnie w dwóch dużych plikach binarnych. Tak więc muszę zbudować feat_queue = tf.train.string_input_producer (feat_filenames) i label_queue = tf.train.string_input_producer (label_filenames). Następnie będę miał dwa tf.FixedLengthRecordReader, aby osobno pobrać feat_queue i etykietę z label_queue. W końcu dodaję [feat, label] do kolejnej kolejki. Oto problem. Kiedy używam FixedLengthRecordReader, aby uzyskać wyczyn i etykietę, czy są one zawsze poprawnie odwzorowane? –

6

Innymi słowy, jeden wątek robi przerób dane, a drugi robi szkolenia. Czy to możliwe w TensorFlow?

Tak, jest. Rozwiązanie firmy Mrrry działa, ale jest prostsze.

pobierania danych

tf.py_func owija funkcję pytona i wykorzystuje ją jako operator TensorFlow. Dzięki temu za każdym razem możemy załadować dane pod sess.run().Problem z tym podejściem polega na tym, że dane są ładowane podczas głównego wątku przez sess.run().

minimalny przykład:

def get_numpy_tensor(): 
    return np.array([[1,2],[3,4]], dtype=np.float32) 
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32) 

bardziej złożony przykład:

def get_numpy_tensors(): 
    # Load data from the disk into numpy arrays. 
    input = np.array([[1,2],[3,4]], dtype=np.float32) 
    target = np.int32(1) 
    return input, target 
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) 

tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target 

sess = tf.InteractiveSession() 
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target]) 
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2 

danych preselekcji na inny wątek

nasze dane w kolejce, w innym wątku (tak, że nie będzie sess.run() muszą czekać na dane), możemy użyć tf.train.batch() na naszych operatorach z tf.py_func().

minimalny przykład:

tensor_shape = get_numpy_tensor().shape 
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape]) 
# Run `tf.train.start_queue_runners()` once session is created. 

Możemy pominąć argumentu shapes jeśli tensorflow_tensor ma swój kształt określony:

tensor_shape = get_numpy_tensor().shape 
tensorflow_tensor.set_shape(tensor_shape) 
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32) 
# Run `tf.train.start_queue_runners()` once session is created. 

bardziej złożony przykład:

input_shape, target_shape = (2, 2),() 
def get_numpy_tensors(): 
    input = np.random.rand(*input_shape).astype(np.float32) 
    target = np.random.randint(10, dtype=np.int32) 
    print('f', end='') 
    return input, target 
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) 
batch_size = 2 
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2) 
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`. 

tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets 

sess = tf.InteractiveSession() 
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it. 
for _ in range(10): 
    numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets]) 
    assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape) 
    print('r', end='') 

# Prints `fffffrrffrfrffrffrffrffrffrffrf`. 

W przypadku get_numpy_tensor() zwraca partię tensorów, n tf.train.batch(..., enqueue_many=True) pomoże.