something wrong with my keras's embedding's test

120 views Asked by At

Here is my python's code runned on pyspark with distkeras.

from keras.layers.embeddings import Embedding
from keras.layers.recurrent import LSTM, GRU
from pyspark.ml.feature import VectorAssembler
from keras.models import Sequential
from keras.layers.core import *
from distkeras.trainers import *
from pyspark import SQLContext


sqlContext = SQLContext(sc)
reader = sqlContext
raw_dataset = reader.read.format('com.databricks.spark.csv').options(header='false', inferSchema='true').load(
    '/home/hpcc/test/11.csv')
# raw_dataset = raw_dataset.repartition(4)
features = raw_dataset.columns
features.remove('C0')
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
dataset = vector_assembler.transform(raw_dataset)
dataset = dataset.select("features", "C0")
model1 = Sequential()
model1.add(Embedding(input_dim=52965, output_dim=256))
model1.add(LSTM(128))
model1.add(Dropout(0.5))
model1.add(Dense(1))
model1.add(Activation('sigmoid'))
model1.summary()
optimizer_mlp= 'adagrad'
loss_mlp = 'binary_crossentropy'
dataset.cache()
trainer = DOWNPOUR(keras_model=model1, worker_optimizer=optimizer_mlp, loss=loss_mlp, num_workers=4,
                   batch_size=16, communication_window=5, learning_rate=0.1, num_epoch=1,
                   features_col="features", label_col="C0")
trainer.set_parallelism_factor(1)
trained_model = trainer.train(dataset)

But it print error like this:

16/12/30 20:53:09 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 7)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "build/bdist.linux-x86_64/egg/distkeras/workers.py", line 193, in train
    X = np.asarray([x[self.features_column] for x in feature_iterator])
  File "/root/anaconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 769, in __getitem__
    raise ValueError("Index %d out of bounds." % index)
ValueError: Index 50 out of bounds.

My 11.csv is like this :

1,23,5211,871,1223,11,5,355,9999,1,28032,33057,1259,5575,4,1,9604,23790,83,136,18,4695,3,121,1,151,521,2,130,24677,4,1,612,947,612,1645,4534,59,29008,1,21679,1,416,3474,3,4189,1,142,5,11323,3
........

It's C0 column is label,and C1 to C50 are features. Hope for your answer.Thanks a lots.

Here is dataset.printSchema():

root
 |-- features: vector (nullable = true)
 |-- C0: integer (nullable = true)
0

There are 0 answers