您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10350|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;8 C% N1 L; e7 u# K# \
  2. require_once __DIR__ . '/Workerman/Autoloader.php';* `* G2 R& R' h5 W4 W5 i, b8 D
  3. ; L! a  i' c* V
  4. $worker = new Worker();/ a' C; j- t! C8 G  x( ?
  5. // 4个进程
    , _0 [5 d4 t; g. D- C
  6. $worker->count = 4;
    ) D( \$ \, w) F: B# p
  7. // 每个进程启动后在当前进程新增一个Worker监听
    ) w) a! s  G5 E; @" D# N
  8. $worker->onWorkerStart = function($worker)% X! y1 S- d* ]* C5 n3 ?+ ^5 s; C3 P
  9. {
    / e6 N8 m/ E0 W$ W9 x; v: M
  10.     /**
    * H, y+ \" w) e" L* h+ T, q. t
  11.      * 4个进程启动的时候都创建2016端口的Worker, p9 \5 [2 o( [5 D* i6 x' g
  12.      * 当执行到worker->listen()时会报Address already in use错误8 s9 o0 ]% a) D
  13.      * 如果worker->count=1则不会报错
      s- z9 E# q5 }
  14.      */$ q4 q+ Z3 O" Q+ n: a
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    4 E( d/ z( U7 B
  16.     $inner_worker->onMessage = 'on_message';
    ; Z* ^  I3 b2 ~9 ^
  17.     // 执行监听。这里会报Address already in use错误1 d7 k4 S$ V- t$ W% ~3 O( i
  18.     $inner_worker->listen();
    / o6 F8 ]  a' q6 y! d3 q
  19. };0 j; {% F! r2 ?* U) F5 q. J; D
  20. 1 N3 q9 @) ]7 i
  21. $worker->onMessage = 'on_message';
    5 ^4 [8 t/ e" A/ d- m9 n( y5 ^
  22. , ]; R; n4 J, M1 `. X, s* o
  23. function on_message($connection, $data)
    7 f; J( I! a; [7 a# `
  24. {
    7 v0 m, U$ ^* m  w* j
  25.     $connection->send("hello\n");
    # O4 S8 D; A- a! A
  26. }
    + d+ C- P. o& f0 f
  27. 7 s3 Q- n) w3 L  @/ P! k
  28. // 运行worker
    - T# P$ u- E4 h; |* s  y
  29. Worker::runAll();
    ( m, J( l3 d. G" d$ V
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 r8 F2 ], e! \9 F/ T( q

  31. & V4 r3 G1 F5 V# \6 e
  32. use Workerman\Worker;
    + t/ C" u2 D. r# R7 k$ I" Q
  33. require_once './Workerman/Autoloader.php';
    + X' y( e$ Q- U

  34. 3 Z% ]" ]4 ]+ U1 i+ S/ u6 M
  35. $worker = new Worker('text://0.0.0.0:2015');4 z5 c. c( P0 G% A  h& O
  36. // 4个进程# N0 e( Y) U/ j
  37. $worker->count = 4;
    / \0 e( G) [; W. }. [
  38. // 每个进程启动后在当前进程新增一个Worker监听
    # m% p" r. k& }0 }6 o
  39. $worker->onWorkerStart = function($worker)% T: M: I8 W% b9 P" u, s6 D- n, x
  40. {
    7 O. e3 _% r, t1 Z& j' T0 }
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    7 n- j, w" g9 `3 t# q' E
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    / v8 X! e$ l/ ~: H- C7 L6 y* O
  43.     $inner_worker->reusePort = true;
      d: t2 ^8 w4 J7 S; N) K
  44.     $inner_worker->onMessage = 'on_message';5 p8 D4 V6 l/ D  c1 p0 _
  45.     // 执行监听。正常监听不会报错$ Q; x1 d: x6 `; m
  46.     $inner_worker->listen();; O0 o# Z+ U6 ]) a
  47. };
    3 l: \# @5 ?/ N* e0 ?" n0 U  _& I
  48.   }* U% a; W0 O# \
  49. $worker->onMessage = 'on_message';
    6 s9 d" m6 I8 m2 T8 [' j# B
  50. " ?6 p' l" u" N/ N* n$ F% U
  51. function on_message($connection, $data)5 N1 p2 ]  z+ N3 v, \1 O
  52. {
    # c: K, _) x1 E# ?
  53.     $connection->send("hello\n");
    7 d8 {/ t/ `- i! ~, b. j& _$ N
  54. }+ ]/ @! O$ c. X7 g+ G2 ?
  55. 5 f- Y& v9 A" X" E/ U6 _. b/ m1 Z
  56. // 运行worker' \6 J9 K4 T5 y
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php6 q& L' w1 i9 A/ c% K  i
  2. use Workerman\Worker;
    4 b! a7 E* l/ e
  3. require_once './Workerman/Autoloader.php';
    : s/ }% G% b/ A2 L" X/ [( E
  4. // 初始化一个worker容器,监听1234端口* {$ e8 {5 i% s, R3 V. ^
  5. $worker = new Worker('websocket://0.0.0.0:1234');: P7 r% S$ x; a* R0 j* |* h
  6. ' m- r  s0 f1 ?0 \
  7. /*" Y3 X& R+ T, u; B4 `8 c
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误, U) K7 `# u4 m4 j0 t4 ~" `* D6 P
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)! V) k3 d: J# Z6 b$ r+ k7 w
  10. */
    - H6 R9 o5 B  e  @8 y% v* s  g
  11. $worker->count = 1;
    6 A+ @. ?) l6 G" m- q0 b' }& h" P
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    8 J$ e$ |* Q& H. I) a* N. b1 V
  13. $worker->onWorkerStart = function($worker)
    # B* p) g/ P! e, {5 o# R" ]9 p# N
  14. {# ?; M" _" J9 j3 b7 S
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    " [0 y6 \. q# f9 f
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');5 q! W. {4 |7 R. `/ x$ A/ @
  17.     $inner_text_worker->onMessage = function($connection, $buffer)6 O  U; N8 R& f& w
  18.     {
    ' j- _2 Z. |. x6 G  }4 m, I9 e
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    $ _4 y4 K( u% \
  20.         $data = json_decode($buffer, true);9 u" a8 T7 a2 u& O2 [) m( c
  21.         $uid = $data['uid'];
    " b7 `  k  v7 m4 s
  22.         // 通过workerman,向uid的页面推送数据- R4 Z+ X& |# {
  23.         $ret = sendMessageByUid($uid, $buffer);9 M) d* o% r: O- b
  24.         // 返回推送结果8 }% K; N5 O: i! g
  25.         $connection->send($ret ? 'ok' : 'fail');1 V" A; A/ R& ~
  26.     };# e2 \1 `2 H/ z( S5 n% l6 s
  27.     // ## 执行监听 ##3 n0 p( _+ y1 N3 a& v7 S4 {" v
  28.     $inner_text_worker->listen();
    ) B( C( w7 Y; y/ Z% H6 c6 e
  29. };
    . {+ w: P4 V% w* J- E$ F% V+ ?
  30. // 新增加一个属性,用来保存uid到connection的映射
      u- t! z% p" n' H' t8 _% s$ y
  31. $worker->uidConnections = array();; Q! ]1 |" b6 G# X: W4 k
  32. // 当有客户端发来消息时执行的回调函数; |1 o: V3 Q* w# o( T
  33. $worker->onMessage = function($connection, $data)7 ?3 R+ l% N: V5 v- h; Y& q. n( p3 Y9 B/ B
  34. {4 n( D' |# x6 N! C
  35.     global $worker;
    $ @; R  \9 A7 K- m& N
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    : l  S! \; i" j: Q* z
  37.     if(!isset($connection->uid))' Q0 A/ ]( M: e! Q7 m5 z  `
  38.     {/ o5 K6 {* Y: S5 q! g
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)% R% C% w6 y$ _  `' W2 y3 s4 ^
  40.        $connection->uid = $data;; i0 ~' G$ U* f9 u
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,  T& M0 B$ D% C2 k" C- j* [- \
  42.         * 实现针对特定uid推送数据
    * W* w: J2 S' d+ Y, {0 j
  43.         *// y; g: [/ i. A1 S! }4 z) H6 j& y) q; d
  44.        $worker->uidConnections[$connection->uid] = $connection;
      M3 ^1 b4 T, x  a7 q* }, y! y* h* F
  45.        return;
    $ u) d' C6 O' w+ G3 G
  46.     }: n& H# V( V0 J0 g8 o2 Q
  47. };5 |* L  j0 [5 e* A/ b' f5 L5 L! T5 D
  48. ; N; W% ]; _) _6 n
  49. // 当有客户端连接断开时/ R8 M8 _% ?# y$ G
  50. $worker->onClose = function($connection)0 D0 y7 Z7 C4 W. J7 q9 Z7 o
  51. {# x; I' J4 B9 x  M% O0 W3 Y+ d
  52.     global $worker;
    4 b9 D# a9 k% b$ x8 ^$ ?) h
  53.     if(isset($connection->uid))$ }) ?$ G& v" N+ E" _! ?9 z$ T
  54.     {
    4 Q; |5 `: M0 Q
  55.         // 连接断开时删除映射# u" c" E0 b4 ]- K5 K4 L; V# R- z
  56.         unset($worker->uidConnections[$connection->uid]);4 K7 \" q& G$ @9 v
  57.     }
    5 n: J( i& @2 o) o8 c0 U& `9 t' X9 `
  58. };1 q+ Z3 G* N& q9 G7 ^$ _. U0 I4 D
  59. " `9 k$ r; \# W7 G& p
  60. // 向所有验证的用户推送数据4 ?6 H% `! Z/ o6 C4 F5 t- L' U
  61. function broadcast($message)* `+ S$ B4 w9 V+ ~
  62. {$ {0 {1 g+ c- l2 m$ e' [- d3 n
  63.    global $worker;# T* f* `$ W& @/ `5 `6 u2 w  V6 d
  64.    foreach($worker->uidConnections as $connection). b3 h5 P3 G, P
  65.    {7 y& }- _4 D, r- O5 v2 }$ w
  66.         $connection->send($message);5 R3 `  x8 Q( Y& r& V
  67.    }" ^  ~* g$ I/ p1 V) v1 I+ ~
  68. }
    ) G. a+ j1 V; a$ ^9 U! F2 N

  69. 1 n$ |! g) V: Q' w: K
  70. // 针对uid推送数据
    6 J& T' p7 j/ j
  71. function sendMessageByUid($uid, $message)0 z6 H# ]# g! a8 L8 y4 ?
  72. {1 Z0 O7 c, n: H# B0 N
  73.     global $worker;( z, r2 }# G: H
  74.     if(isset($worker->uidConnections[$uid]))
    : S! \& v0 w; v- K$ G$ W8 o
  75.     {
    ) Z% ^& T- W( Y
  76.         $connection = $worker->uidConnections[$uid];
    % l. {1 P' y: Q7 Z$ O! f, h
  77.         $connection->send($message);
    $ k! j9 r  i# C- ]" }
  78.         return true;& g. r) R" K9 I4 h* K; k& `) o
  79.     }$ a2 _( U" n; \5 u6 s+ i5 \! i
  80.     return false;* A4 o, V% P: v7 U* W. F: L& D
  81. }
    " C1 L4 |7 N& \" m" A

  82. / e! ^2 r% h6 b. K
  83. // 运行所有的worker
    , \5 i, e- `  _2 {, O' f
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');8 T. }3 A: o/ [7 z- Z9 G- }
  2. ws.onopen = function(){8 F* v9 l* C* C1 Q9 f
  3.     var uid = 'uid1';
    2 Q4 U- G8 y+ N! A, L6 j/ c
  4.     ws.send(uid);
    " s. n* U/ a# j& g2 U! N& a
  5. };# }+ F& S# `0 o. W
  6. ws.onmessage = function(e){
    " p% f# S- l8 n0 j# ^& [( A$ G
  7.     alert(e.data);
    0 Y0 F6 p" s% S1 G3 R+ q2 t
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    # r( q: {' o  f7 Q. s1 Z+ P+ [
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);$ u$ x# J- a) u5 S2 L
  3. // 推送的数据,包含uid字段,表示是给这个uid推送' G% h9 u- K  g1 n+ u/ t; W
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');2 t0 b, b5 ^" O$ {- H) K
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    + `$ E% u* e  z1 J- e# z8 ^3 v6 w
  6. fwrite($client, json_encode($data)."\n");9 N) j  P% l0 I' e* e* R' t
  7. // 读取推送结果& Q6 Z2 v4 f# S: W$ `
  8. echo fread($client, 8192);
复制代码

3 y4 l( |) _- k! A( V
4 g5 ^9 \8 @. t
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-6 12:05 , Processed in 0.105212 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!