@@ -407,7 +407,7 @@ object DiscoveryService {
407
407
case false =>
408
408
initBond(peer).flatMap {
409
409
case Some (result) =>
410
- result.pongReceived.get
410
+ result.pongReceived.get.timeoutTo(config.requestTimeout, Task .pure( false ))
411
411
412
412
case None =>
413
413
Task (logger.debug(s " Trying to bond with $peer... " )) >>
@@ -433,6 +433,7 @@ object DiscoveryService {
433
433
_ <- completePong(peer, responded = false )
434
434
} yield false
435
435
}
436
+ .guarantee(stateRef.update(_.clearBondingResults(peer)))
436
437
}
437
438
}
438
439
@@ -563,7 +564,7 @@ object DiscoveryService {
563
564
564
565
waitOrFetch.flatMap {
565
566
case Left (wait) =>
566
- wait.get
567
+ wait.get.timeoutTo(config.requestTimeout, Task .pure( None ))
567
568
568
569
case Right (fetch) =>
569
570
val maybeEnr = bond(peer).flatMap {
@@ -729,44 +730,55 @@ object DiscoveryService {
729
730
case true =>
730
731
rpc
731
732
.findNode(peer)(target)
732
- .map(_.map(_.toList).getOrElse(Nil ))
733
+ .flatMap {
734
+ case None =>
735
+ Task (logger.debug(s " Received no response for neighbors for $target from ${peer.address}" )).as(Nil )
736
+ case Some (neighbors) =>
737
+ Task (logger.debug(s " Received ${neighbors.size} neighbors for $target from ${peer.address}" ))
738
+ .as(neighbors.toList)
739
+ }
733
740
.flatMap { neighbors =>
734
741
neighbors.filterA { neighbor =>
735
742
if (neighbor.address.checkRelay(peer))
736
743
Task .pure(true )
737
744
else
738
- Task (logger.debug(s " Ignoring neighbor $neighbor from $peer because of invalid relay IP. " )).as(false )
745
+ Task (logger.debug(s " Ignoring neighbor $neighbor from ${peer.address} because of invalid relay IP. " ))
746
+ .as(false )
739
747
}
740
748
}
741
749
.recoverWith {
742
750
case NonFatal (ex) =>
743
- Task (logger.debug(s " Failed to fetch neighbors of $target from $from : $ex" )).as(Nil )
751
+ Task (logger.debug(s " Failed to fetch neighbors of $target from ${peer.address} : $ex" )).as(Nil )
744
752
}
745
753
case false =>
746
- Task (logger.debug(s " Could not bond with $from to fetch neighbors of $target" )).as(Nil )
754
+ Task (logger.debug(s " Could not bond with ${peer.address} to fetch neighbors of $target" )).as(Nil )
747
755
}
748
756
}
749
757
750
758
// Make sure these new nodes can be bonded with before we consider them,
751
759
// otherwise they might appear to be be closer to the target but actually
752
760
// be fakes with unreachable addresses that could knock out legit nodes.
753
- def bondNeighbors (neighbors : Seq [Node ]): Task [Seq [Node ]] = {
754
- Task
755
- .parTraverseUnordered(neighbors) { neighbor =>
756
- bond(toPeer(neighbor)).flatMap {
757
- case true =>
758
- Task .pure(Some (neighbor))
759
- case false =>
760
- Task (logger.debug(s " Could not bond with neighbor candidate $neighbor" )).as(None )
761
+ def bondNeighbors (neighbors : Seq [Node ]): Task [Seq [Node ]] =
762
+ for {
763
+ _ <- Task (logger.debug(s " Bonding with ${neighbors.size} neighbors... " ))
764
+ bonded <- Task
765
+ .parTraverseN(config.kademliaAlpha)(neighbors) { neighbor =>
766
+ bond(toPeer(neighbor)).flatMap {
767
+ case true =>
768
+ Task .pure(Some (neighbor))
769
+ case false =>
770
+ Task (logger.debug(s " Could not bond with neighbor candidate $neighbor" )).as(None )
771
+ }
761
772
}
762
- }
763
- .map(_.flatten )
764
- }
773
+ .map(_.flatten)
774
+ _ <- Task (logger.debug( s " Bonded with ${bonded.size} neighbors out of ${neighbors.size} . " ) )
775
+ } yield bonded
765
776
766
777
def loop (
767
778
local : Node ,
768
779
closest : SortedSet [Node ],
769
- asked : Set [Node ]
780
+ asked : Set [Node ],
781
+ neighbors : Set [Node ]
770
782
): Task [SortedSet [Node ]] = {
771
783
// Contact the alpha closest nodes to the target that we haven't asked before.
772
784
val contacts = closest
@@ -775,24 +787,32 @@ object DiscoveryService {
775
787
.take(config.kademliaAlpha)
776
788
.toList
777
789
778
- if (contacts.isEmpty)
779
- Task .pure(closest)
780
- else {
781
- Task
782
- .parTraverseUnordered(contacts)(fetchNeighbors)
783
- .map(_.flatten.distinct)
784
- .flatMap(bondNeighbors)
785
- .flatMap { newNeighbors =>
786
- val newClosest = (closest ++ newNeighbors).take(config.kademliaBucketSize)
787
- val newAsked = asked ++ contacts
788
- loop(local, newClosest, newAsked)
789
- }
790
+ if (contacts.isEmpty) {
791
+ Task (
792
+ logger.debug(s " Lookup for $target finished; asked ${asked.size} nodes, found ${neighbors.size} neighbors. " )
793
+ ).as(closest)
794
+ } else {
795
+ Task (
796
+ logger.debug(s " Lookup for $target contacting ${contacts.size} new nodes; asked ${asked.size} nodes so far. " )
797
+ ) >>
798
+ Task
799
+ .parTraverseUnordered(contacts)(fetchNeighbors)
800
+ .map(_.flatten.distinct.filterNot(neighbors))
801
+ .flatMap(bondNeighbors)
802
+ .flatMap { newNeighbors =>
803
+ val nextClosest = (closest ++ newNeighbors).take(config.kademliaBucketSize)
804
+ val nextAsked = asked ++ contacts
805
+ val nextNeighbors = neighbors ++ newNeighbors
806
+ val newClosest = nextClosest diff closest
807
+ Task (logger.debug(s " Lookup for $target found ${newClosest.size} neighbors closer than before. " )) >>
808
+ loop(local, nextClosest, nextAsked, nextNeighbors)
809
+ }
790
810
}
791
811
}
792
812
793
813
init.flatMap {
794
814
case (localNode, closestNodes) =>
795
- loop(localNode, closest = SortedSet (closestNodes : _* ), asked = Set (localNode))
815
+ loop(localNode, closest = SortedSet (closestNodes : _* ), asked = Set (localNode), neighbors = closestNodes.toSet )
796
816
}
797
817
}
798
818
@@ -825,7 +845,9 @@ object DiscoveryService {
825
845
_ <- Task (
826
846
logger.info(s " Successfully enrolled with $enrolled bootstrap nodes. Performing initial lookup... " )
827
847
)
828
- _ <- lookup(nodeId)
848
+ _ <- lookup(nodeId).doOnFinish {
849
+ _.fold(Task .unit)(ex => Task (logger.error(s " Error during initial lookup " , ex)))
850
+ }
829
851
nodeCount <- stateRef.get.map(_.nodeMap.size)
830
852
_ <- Task (logger.info(s " Discovered $nodeCount nodes by the end of the lookup. " ))
831
853
} yield ()
0 commit comments