Tensorflow 1.15 multi worker strategy hangs after graph initialization on multiple machiens

411 views Asked by At

I am running the TF keras_to_estimator example on using two machines, the process hangs after graph initialization when running the start script on each machine.

The messages of console output on worker 0 machine after starting:

INFO:tensorflow:Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['node4:21111', 'node3:21112']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.AUTO
I0605 17:05:20.218733 139934274328320 collective_all_reduce_strategy.py:310] Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['node4:21111', 'node3:21112']}, task_type = 'worker', task_id = 0, num_workers = 2, local_devices = ('/job:worker/task:0',
), communication = CollectiveCommunication.AUTO
INFO:tensorflow:Updated config: {'_model_dir': '/node4/jianwang/atp_bert/albert_zh/example_dir', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.contrib.distribute.python.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f44402717b8>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.contrib.distribute.python.mirrored_strategy.MirroredStrategy object at 0x7f44402755c0>, '_experimental_distribute': DistributeConfig(train_distribute=<tensorflow.contrib.distribute.python.collective_all_reduce_strategy.Collectiv
eAllReduceStrategy object at 0x7f4440275240>, eval_distribute=<tensorflow.contrib.distribute.python.mirrored_strategy.MirroredStrategy object at 0x7f44402755c0>, remote_cluster=None), '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f4440275940>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': 'grpc://node4:21111', '_evaluation_master': 'grpc://node4:21111', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 2, '_distribute_coordinator_mode': 'independent_worker'}
I0605 17:05:20.221589 139934274328320 estimator_training.py:228] Updated config: {'_model_dir': '/node4/jianwang/atp_bert/albert_zh/example_dir', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session
_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.contrib.distribute.python.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f44402717b8>, '_device_fn': None, '_proto
col': None, '_eval_distribute': <tensorflow.contrib.distribute.python.mirrored_strategy.MirroredStrategy object at 0x7f44402755c0>, '_experimental_distribute': DistributeConfig(train_distribute=<tensorflow.contrib.distribute.python.collective_all_reduce_strategy.Collectiv
eAllReduceStrategy object at 0x7f4440275240>, eval_distribute=<tensorflow.contrib.distribute.python.mirrored_strategy.MirroredStrategy object at 0x7f44402755c0>, remote_cluster=None), '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_s
ervice': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f4440275940>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': 'grpc://node4:21111', '_evaluation_master': 'grpc://node4:21111', '_is_chief': True,
 '_num_ps_replicas': 0, '_num_worker_replicas': 2, '_distribute_coordinator_mode': 'independent_worker'}
input_fn called
INFO:tensorflow:Calling model_fn.
I0605 17:05:20.358438 139911839606528 estimator.py:1148] Calling model_fn.
...
INFO:tensorflow:Creating chief session creator with config: device_filters: "/job:worker/task:0"
allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
    scoped_allocator_optimization: ON
    scoped_allocator_opts {
      enable_op: "CollectiveReduce"
    }
  }
}
experimental {
  collective_group_leader: "/job:worker/replica:0/task:0"
}

I0605 17:05:20.711247 139934274328320 distribute_coordinator.py:251] Creating chief session creator with config: device_filters: "/job:worker/task:0"
allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
    scoped_allocator_optimization: ON
    scoped_allocator_opts {
      enable_op: "CollectiveReduce"
    }
  }
}
experimental {
  collective_group_leader: "/job:worker/replica:0/task:0"
}

INFO:tensorflow:Graph was finalized.
I0605 17:05:20.870544 139934274328320 monitored_session.py:240] Graph was finalized.

The same message is also print out on the worker 1 machine which also shows that the process is stuck after graph initialization

I0605 17:10:28.616780 140121708521216 collective_all_reduce_strategy.py:310] Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'worker': ['node4:21111', 'node3:21112']}, task_type = 'worker', task_id = 1, num_workers = 2, local_devices = ('/job:worker/task:1',
), communication = CollectiveCommunication.AUTO
INFO:tensorflow:Updated config: {'_model_dir': '/node4/jianwang/atp_bert/albert_zh/example_dir', '_num_ps_replicas': 0, '_tf_random_seed': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_experimental_max_worker_delay_secs': None, '_eval_distribute': <tensorflow.contrib.distribute.python.mirrored_strategy.MirroredStrategy object at 0x7f7085a28128>, '_save_checkpoints_secs': 600, '_keep_checkpoint_every_n_hours': 10000, '_is_chief': False, '_keep_checkp
oint_max': 5, '_device_fn': None, '_experimental_distribute': DistributeConfig(train_distribute=<tensorflow.contrib.distribute.python.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f7085a1eef0>, eval_distribute=<tensorflow.contrib.distribute.pytho
n.mirrored_strategy.MirroredStrategy object at 0x7f7085a28128>, remote_cluster=None), '_session_creation_timeout_secs': 7200, '_master': 'grpc://node3:21112', '_service': None, '_task_type': 'worker', '_task_id': 1, '_protocol': None, '_log_step_count_steps': 100, '_distr
ibute_coordinator_mode': 'independent_worker', '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f7085a282e8>, '_global_id_in_cluster': 1, '_evaluation_master': 'grpc://node3:21112', '_train_distribute': <tensorflow.contrib.distribute.python
.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f7085a1e8d0>, '_num_worker_replicas': 2, '_save_checkpoints_steps': None, '_save_summary_steps': 100}
I0605 17:10:28.623507 140121708521216 estimator_training.py:228] Updated config: {'_model_dir': '/node4/jianwang/atp_bert/albert_zh/example_dir', '_num_ps_replicas': 0, '_tf_random_seed': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_experimental_max_worker_delay_secs': None, '_eval_distribute': <tensorflow.contrib.distribute.python.mirrored_strategy.MirroredStrategy object at 0x7f7085a28128>, '_save_checkpoints_secs': 600, '_keep_checkpoint_every_n_hours': 10000, '_is_chief': False, '_keep_checkp
oint_max': 5, '_device_fn': None, '_experimental_distribute': DistributeConfig(train_distribute=<tensorflow.contrib.distribute.python.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f7085a1eef0>, eval_distribute=<tensorflow.contrib.distribute.pytho
n.mirrored_strategy.MirroredStrategy object at 0x7f7085a28128>, remote_cluster=None), '_session_creation_timeout_secs': 7200, '_master': 'grpc://node3:21112', '_service': None, '_task_type': 'worker', '_task_id': 1, '_protocol': None, '_log_step_count_steps': 100, '_distr
ibute_coordinator_mode': 'independent_worker', '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f7085a282e8>, '_global_id_in_cluster': 1, '_evaluation_master': 'grpc://node3:21112', '_train_distribute': <tensorflow.contrib.distribute.python
.collective_all_reduce_strategy.CollectiveAllReduceStrategy object at 0x7f7085a1e8d0>, '_num_worker_replicas': 2, '_save_checkpoints_steps': None, '_save_summary_steps': 100}
input_fn called
INFO:tensorflow:Calling model_fn.
...

INFO:tensorflow:Creating chief session creator with config: device_filters: "/job:worker/task:1"
allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
    scoped_allocator_optimization: ON
    scoped_allocator_opts {
      enable_op: "CollectiveReduce"
    }
  }
}
experimental {
  collective_group_leader: "/job:worker/replica:0/task:0"
}

I0605 17:10:29.048442 140121708521216 distribute_coordinator.py:251] Creating chief session creator with config: device_filters: "/job:worker/task:1"
allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
    scoped_allocator_optimization: ON
    scoped_allocator_opts {
      enable_op: "CollectiveReduce"
    }
  }
}
experimental {
  collective_group_leader: "/job:worker/replica:0/task:0"

Code related: (1) example.sh (start script running on node4 the worker 0 machine)

export TF_CONFIG='{
"cluster": {
"worker": ["node4:21111", "node3:21112"]
},
"task": {"type": "worker", "index": 0}
}'
export CUDA_VISIBLE_DEVICES=0
export OUTPUT_DIR=/node4/jianwang/atp_bert/albert_zh/example_dir
python example.py $OUTPUT_DIR

(2) example_slave.sh (start script to run on the worker 1 machine)

export TF_CONFIG='{
"cluster": {
"worker": ["node4:21111", "node3:21112"]
},
"task": {"type": "worker", "index": 1}
}'
export CUDA_VISIBLE_DEVICES=7
export OUTPUT_DIR=/node4/jianwang/atp_bert/albert_zh/example_dir
python example.py $OUTPUT_DIR

(3) example.py

"""An example of training Keras model with multi-worker strategies."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys

import numpy as np
import tensorflow as tf


def input_fn():
  print("input_fn called")
  x = np.random.random((1024, 10))
  y = np.random.randint(2, size=(1024, 1))
  x = tf.cast(x, tf.float32)
  dataset = tf.data.Dataset.from_tensor_slices((x, y))
  dataset = dataset.repeat(100)
  dataset = dataset.batch(32)
  return dataset


def main(args):
  if len(args) < 2:
    print('You must specify model_dir for checkpoints such as'
          ' /tmp/tfkeras_example/.')
    return

  model_dir = args[1]
  print('Using %s to store checkpoints.' % model_dir)

  # Define a Keras Model.
  model = tf.keras.Sequential()
  model.add(tf.keras.layers.Dense(16, activation='relu', input_shape=(10,)))
  model.add(tf.keras.layers.Dense(1, activation='sigmoid'))

  # Compile the model.
  optimizer = tf.train.GradientDescentOptimizer(0.2)
  model.compile(loss='binary_crossentropy', optimizer=optimizer)
  model.summary()
  tf.keras.backend.set_learning_phase(True)

  # Define DistributionStrategies and convert the Keras Model to an
  # Estimator that utilizes these DistributionStrateges.
  # Evaluator is a single worker, so using MirroredStrategy.
  config = tf.estimator.RunConfig(
      experimental_distribute=tf.contrib.distribute.DistributeConfig(
          train_distribute=tf.contrib.distribute.CollectiveAllReduceStrategy(
              ),
          eval_distribute=tf.contrib.distribute.MirroredStrategy(
              )))
  keras_estimator = tf.keras.estimator.model_to_estimator(
      keras_model=model, config=config, model_dir=model_dir)

  # Train and evaluate the model. Evaluation will be skipped if there is not an
  # "evaluator" job in the cluster.
  print("Start train eval")
  tf.estimator.train_and_evaluate(
      keras_estimator,
      train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
      eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))


if __name__ == '__main__':
  tf.logging.set_verbosity(tf.logging.INFO)
  tf.app.run(argv=sys.argv)

lspci output: lspci |grep PCI

jianwang@node3:~$ lspci |grep PCI
00:01.0 PCI bridge: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D PCI Express Root Port 1 (rev 01)
00:02.0 PCI bridge: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D PCI Express Root Port 2 (rev 01)
00:03.0 PCI bridge: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D PCI Express Root Port 3 (rev 01)
00:1c.0 PCI bridge: Intel Corporation C610/X99 series chipset PCI Express Root Port #1 (rev d5)
00:1c.7 PCI bridge: Intel Corporation C610/X99 series chipset PCI Express Root Port #8 (rev d5)
02:00.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
03:08.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
03:10.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
06:00.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
07:08.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
07:10.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
0b:00.0 PCI bridge: ASPEED Technology, Inc. AST1150 PCI-to-PCI Bridge (rev 03)
7f:10.0 System peripheral: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D R2PCIe Agent (rev 01)
7f:10.1 Performance counters: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D R2PCIe Agent (rev 01)
80:00.0 PCI bridge: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D PCI Express Root Port 0 (rev 01)
80:01.0 PCI bridge: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D PCI Express Root Port 1 (rev 01)
80:02.0 PCI bridge: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D PCI Express Root Port 2 (rev 01)
80:03.0 PCI bridge: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D PCI Express Root Port 3 (rev 01)
83:00.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
84:08.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
84:10.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
87:00.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
88:08.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
88:10.0 PCI bridge: PLX Technology, Inc. PEX 8747 48-Lane, 5-Port PCI Express Gen 3 (8.0 GT/s) Switch (rev ca)
ff:10.0 System peripheral: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D R2PCIe Agent (rev 01)
ff:10.1 Performance counters: Intel Corporation Xeon E7 v4/Xeon E5 v4/Xeon E3 v4/Xeon D R2PCIe Agent (rev 01)

I have tried to add "chief" to TF_CONFIG, disable IOMMU followed by : disable ioMMU

none worked. Please help on : (1) how to diagnose the problem on where it hangs (2) any insights on how to work around this problem

0

There are 0 answers