您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14818|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    ( O' B4 w9 O' O
  2. require_once __DIR__ . '/Workerman/Autoloader.php';' `9 ~, z2 Y  ?. r! Y. o

  3. . N, p6 \2 J7 o% G$ O/ D  `1 Q  t
  4. $worker = new Worker();+ ~7 L# k+ Y) E! a
  5. // 4个进程( @3 Q' [/ Q. K7 ^
  6. $worker->count = 4;
    - o# S, z! m+ ?1 o+ v1 \
  7. // 每个进程启动后在当前进程新增一个Worker监听" U" U9 `1 [/ c" i
  8. $worker->onWorkerStart = function($worker)' w3 [5 |1 @$ V) S% g8 x4 N- F4 ?
  9. {  V0 z) G* B" o4 @: ^9 _
  10.     /**
    , e2 u7 `3 O+ \8 X
  11.      * 4个进程启动的时候都创建2016端口的Worker: p! i: e/ c) U7 L' y6 u6 ]( t9 G( c
  12.      * 当执行到worker->listen()时会报Address already in use错误
    8 r! R. x# G8 Q; X5 Q8 W
  13.      * 如果worker->count=1则不会报错5 j5 u, Y# c% y
  14.      */
    ; _' N" D7 z8 L1 g
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    2 A' G+ f* ~0 f/ D0 R5 E$ ~. H8 r
  16.     $inner_worker->onMessage = 'on_message';
    / h$ n4 X: y" ^* L7 O( e9 p3 l4 W( t
  17.     // 执行监听。这里会报Address already in use错误
    & R) N& [. o2 I' d' B0 P
  18.     $inner_worker->listen();) L$ ^) ], h  `, U2 n
  19. };
    $ W1 j; x8 _4 w7 R
  20. # U' I$ k$ N7 T6 N3 Y8 f
  21. $worker->onMessage = 'on_message';
    + }4 Q# U3 @  b0 f/ b

  22. , Z- A+ g- w6 B& i9 T8 `1 j
  23. function on_message($connection, $data)
    9 y$ O# @+ i, w; v
  24. {+ D3 O$ i0 }, X2 ]9 V* i4 Q6 g
  25.     $connection->send("hello\n");
    3 Y7 L( `9 T' b
  26. }
    : _3 U8 C$ V7 m" Q9 w  @

  27. 6 {. l; Q& `$ K9 m
  28. // 运行worker
    ( g5 ]2 S" Q) S- C- {; A5 v& b$ ?1 O
  29. Worker::runAll();
    ! I/ X  `* n6 t# z' Q
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:& ]* J( a: @0 @  g

  31. : x/ n$ C3 ~/ K; s/ V" \
  32. use Workerman\Worker;
    4 u/ o+ r% p6 M$ y' C+ r
  33. require_once './Workerman/Autoloader.php';" o* Z8 V' W' r* A4 I( H- b

  34. " v3 T; Y% A# `+ E
  35. $worker = new Worker('text://0.0.0.0:2015');4 Y1 M+ H7 m$ e5 a
  36. // 4个进程
    2 S5 N/ _0 m  k1 }6 n! a
  37. $worker->count = 4;
    6 g$ `0 a4 s/ d4 x$ ~' M
  38. // 每个进程启动后在当前进程新增一个Worker监听
    $ G1 C7 {4 L: f5 ^" r0 `4 M1 s8 S
  39. $worker->onWorkerStart = function($worker)9 ^5 Q' |# h/ D- C' `
  40. {; z+ p0 H/ J1 ^. j$ k
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    , A# J9 `7 T. v" X
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)* Q; G/ ?: n0 Q
  43.     $inner_worker->reusePort = true;
    4 z7 W8 l% t; w5 H
  44.     $inner_worker->onMessage = 'on_message';
      {, w% I/ W# {. T
  45.     // 执行监听。正常监听不会报错
      L" K) t, ^: X7 y
  46.     $inner_worker->listen();
    / a+ j1 ~8 m" x* j( W: r' r! p2 J
  47. };) t/ B5 `, o& m' z

  48. ) u+ p* ~0 T! D9 |1 r6 C% l
  49. $worker->onMessage = 'on_message';! p5 P- \0 r- t" J* `# b  D, Y) p

  50. $ p  i! a) ]7 n3 W9 n
  51. function on_message($connection, $data)
    & ^6 s) r  f5 O" E& C
  52. {' p. F) ]" r' ]; ]+ \
  53.     $connection->send("hello\n");
    1 u' V0 \- i4 w/ m( q$ y
  54. }
    . z( R' O: t! d$ }$ m
  55. 8 U6 ~. d5 b, t! R, Y
  56. // 运行worker
    * `' G& D$ }# R  K: d. p
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    % Q- \8 N$ g( x( B" R
  2. use Workerman\Worker;5 N. z- t2 I6 u$ H  F* Y; ~# ]" L
  3. require_once './Workerman/Autoloader.php';/ }( s, y0 x6 P
  4. // 初始化一个worker容器,监听1234端口
    8 U) I& A. i9 z: v) l& o' B
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ! M4 k! W3 \$ n/ |7 ?( N: Z
  6. 3 b2 M. o1 U, |' g
  7. /*& h# Z( i' H. }% E" I# b
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误3 s8 Z) o/ Q: G* j
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    " t9 c  i$ C0 u1 U4 q# [
  10. */3 |1 f% n/ W( P: O6 U7 O7 {* o# S
  11. $worker->count = 1;0 v% y1 q. P, V: B- N( M" C
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    5 _( u: M2 l' \8 [, P$ {, `' O
  13. $worker->onWorkerStart = function($worker)
    2 X# u( _1 g8 D' C7 `0 ^
  14. {8 b) z5 }; r, R! y
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符) c  W3 A( J9 x
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    3 x, S" r9 m0 c, A/ Q
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    8 i& f/ Z0 n  H9 y0 O
  18.     {- G& A3 }9 v% f5 J* g
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据. k$ O5 j) g$ e! ^% [( {
  20.         $data = json_decode($buffer, true);; Q2 V, @" z% W0 _$ u
  21.         $uid = $data['uid'];
    5 [( o0 I4 H# C- H
  22.         // 通过workerman,向uid的页面推送数据6 B$ `8 R5 T1 ~$ _9 ]
  23.         $ret = sendMessageByUid($uid, $buffer);  M& T% f8 ]7 d
  24.         // 返回推送结果
    6 l! B8 h' S, s- ]& E
  25.         $connection->send($ret ? 'ok' : 'fail');
    - i! l: H( d) R6 q, {3 T
  26.     };
    ) V# ~# d* F/ a; T* @, A% \5 i
  27.     // ## 执行监听 ##
    9 l( W) Y/ [' D- i- g6 \! i. ?
  28.     $inner_text_worker->listen();
    & @6 r1 ]* O* g# t5 Q/ h9 i
  29. };. f4 Z2 _. y( j( L& [
  30. // 新增加一个属性,用来保存uid到connection的映射( }4 C, Z$ J6 r) e
  31. $worker->uidConnections = array();
    , {0 `2 w1 K. y0 z
  32. // 当有客户端发来消息时执行的回调函数, K. q- w5 x+ d3 n
  33. $worker->onMessage = function($connection, $data)5 s) L1 \8 {! c" L* X* R8 c
  34. {
    : |3 S7 U: q0 j; q. n3 x8 D* F
  35.     global $worker;
    5 g" S7 A8 f% q4 y) V+ f
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    8 K, `# F! F  W, S6 n
  37.     if(!isset($connection->uid))1 b# X% M  N( ~# g  ^/ q8 l) {
  38.     {
    - v' Q2 {/ v% T! r0 X) a
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    ) C& {. p# {' {
  40.        $connection->uid = $data;
    ( A9 E. C2 u. a$ }
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,# f% R* Z! e" d- A9 R+ d" ~
  42.         * 实现针对特定uid推送数据6 l1 i8 O5 Q$ Z* a
  43.         */' |8 i' K+ f* A4 H! y8 @
  44.        $worker->uidConnections[$connection->uid] = $connection;4 e7 m( B3 D4 ?$ o# d- r! @
  45.        return;: K6 Q4 n  v3 P1 U& r$ t
  46.     }
    . F% E( p3 O$ ]1 I
  47. };" X* M9 Y3 E: [3 \. J( K
  48. + Z) D) f0 T+ O: ?" C# v3 V
  49. // 当有客户端连接断开时0 y  D! C8 [- R) x6 A( H
  50. $worker->onClose = function($connection)7 Z: E! y* r' a0 W- \) j% f
  51. {
    # S7 K/ G7 d: R  t) b5 E
  52.     global $worker;
    2 b0 x1 H9 o: e. G
  53.     if(isset($connection->uid))7 H3 x, G) }" d+ X7 W! Q
  54.     {
    5 B  G3 \/ I/ f
  55.         // 连接断开时删除映射7 L% h& @, _+ Y4 j* U6 X
  56.         unset($worker->uidConnections[$connection->uid]);0 m' O& A/ f( Z" d4 f8 F
  57.     }/ x& B$ G  O" O+ G  N+ u5 A0 n
  58. };% r, h$ k& g) W% ?# P* X

  59. ; e% {- `- o  \9 i' J0 S
  60. // 向所有验证的用户推送数据
    8 e5 j' d4 C' T3 [- e7 e: y, F
  61. function broadcast($message)
    : c7 r2 q( |( V# c9 D3 O
  62. {( |: d0 m, t9 B2 u( e4 W
  63.    global $worker;
    * T6 A5 g" b0 Y1 i7 g' ~3 \
  64.    foreach($worker->uidConnections as $connection)
    ! J: y1 V6 A# @/ d3 ^4 g! x  L
  65.    {
    ' r9 a8 _: Q; H& B- W
  66.         $connection->send($message);
    6 u) S, T2 j* }/ f& X: Z! Z# ~( s
  67.    }6 w  {; R% p( `* k$ M
  68. }5 a1 R- R' M% W, `# T, C  J
  69. 0 {9 D" P9 L( g2 t; R' C* _
  70. // 针对uid推送数据
      O. C; ^5 ?9 `; c4 U* d( K# K
  71. function sendMessageByUid($uid, $message)* |" n) o6 Q% S2 Y7 r$ p
  72. {
    7 N" r9 U" ]5 Q+ F4 ]& c7 q, N/ m
  73.     global $worker;
    & f8 P2 v9 x+ `2 R3 N
  74.     if(isset($worker->uidConnections[$uid]))' M2 H9 w/ i0 _3 t
  75.     {
    ; T5 Y! H* Y; I1 e' K) n& e& l
  76.         $connection = $worker->uidConnections[$uid];
    - h0 H2 X) a2 S- o+ Y
  77.         $connection->send($message);+ V5 m9 r  C5 Z8 n
  78.         return true;
    * Y& a% E- [6 K, I8 [$ C
  79.     }( `+ w& Z" V" e
  80.     return false;
    8 W. p! s  S+ V1 s0 H# g/ P, p
  81. }1 L$ l) g5 G# ~: m0 ]

  82. 9 `+ y- g$ r7 U- I. O7 |( p% T# U. Z
  83. // 运行所有的worker: q) \9 P3 E. f  \3 L
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');% m+ z; M, S8 B  a9 j
  2. ws.onopen = function(){
    + ^$ {3 v, F" S- h$ L7 ?% z
  3.     var uid = 'uid1';
    9 U( f3 j  u5 F) k
  4.     ws.send(uid);" A9 z5 H: c. p6 u' ?3 R
  5. };
    1 T: U* {! [4 l* J8 q) u
  6. ws.onmessage = function(e){
    0 ^: K' N, v+ `! g0 }1 s
  7.     alert(e.data);
    ; m1 a' \5 x1 ^3 q. J
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口1 y$ w( Z1 Z3 X7 @4 l
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);0 C+ G( y: P+ B3 `. h, t" k' j
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    0 L! b* g4 U  C) k1 X: Q9 K
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    6 o' b6 k0 K% Y7 g
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    & ^3 i/ l. s* a
  6. fwrite($client, json_encode($data)."\n");
    9 q! R# c: {0 F+ l
  7. // 读取推送结果( f$ b4 s) R/ N" I
  8. echo fread($client, 8192);
复制代码

/ i. |8 ~8 K9 H! |" S  D+ ~: q4 @/ \  b7 N) s
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-17 22:03 , Processed in 0.079155 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!